summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py698
-rw-r--r--kafka/consumer/__init__.py6
-rw-r--r--kafka/consumer/base.py169
-rw-r--r--kafka/consumer/multiprocess.py248
-rw-r--r--kafka/consumer/simple.py318
-rw-r--r--kafka/partitioner.py58
-rw-r--r--kafka/partitioner/__init__.py6
-rw-r--r--kafka/partitioner/base.py23
-rw-r--r--kafka/partitioner/hashed.py12
-rw-r--r--kafka/partitioner/roundrobin.py23
-rw-r--r--kafka/producer/__init__.py6
-rw-r--r--kafka/producer/base.py (renamed from kafka/producer.py)115
-rw-r--r--kafka/producer/keyed.py62
-rw-r--r--kafka/producer/simple.py73
-rw-r--r--test/test_consumer_integration.py2
-rw-r--r--test/test_failover_integration.py2
-rw-r--r--test/test_producer.py2
17 files changed, 950 insertions, 873 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
deleted file mode 100644
index 42628e1..0000000
--- a/kafka/consumer.py
+++ /dev/null
@@ -1,698 +0,0 @@
-from __future__ import absolute_import
-
-try:
- from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
-except ImportError: # python 2
- from itertools import izip_longest as izip_longest, repeat
-import logging
-import time
-import numbers
-from threading import Lock
-from multiprocessing import Process, Queue as MPQueue, Event, Value
-
-import six
-
-try:
- from Queue import Empty, Queue
-except ImportError: # python 2
- from queue import Empty, Queue
-
-import kafka.common
-from kafka.common import (
- FetchRequest, OffsetRequest,
- OffsetCommitRequest, OffsetFetchRequest,
- ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
- UnknownTopicOrPartitionError
-)
-
-from kafka.util import ReentrantTimer
-
-log = logging.getLogger("kafka")
-
-AUTO_COMMIT_MSG_COUNT = 100
-AUTO_COMMIT_INTERVAL = 5000
-
-FETCH_DEFAULT_BLOCK_TIMEOUT = 1
-FETCH_MAX_WAIT_TIME = 100
-FETCH_MIN_BYTES = 4096
-FETCH_BUFFER_SIZE_BYTES = 4096
-MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
-
-ITER_TIMEOUT_SECONDS = 60
-NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
-
-
-class FetchContext(object):
- """
- Class for managing the state of a consumer during fetch
- """
- def __init__(self, consumer, block, timeout):
- self.consumer = consumer
- self.block = block
-
- if block:
- if not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
- self.timeout = timeout * 1000
-
- def __enter__(self):
- """Set fetch values based on blocking status"""
- self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
- self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
- if self.block:
- self.consumer.fetch_max_wait_time = self.timeout
- self.consumer.fetch_min_bytes = 1
- else:
- self.consumer.fetch_min_bytes = 0
-
- def __exit__(self, type, value, traceback):
- """Reset values"""
- self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
- self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
-
-
-class Consumer(object):
- """
- Base class to be used by other consumers. Not to be used directly
-
- This base class provides logic for
- * initialization and fetching metadata of partitions
- * Auto-commit logic
- * APIs for fetching pending message count
- """
- def __init__(self, client, group, topic, partitions=None, auto_commit=True,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL):
-
- self.client = client
- self.topic = topic
- self.group = group
- self.client.load_metadata_for_topics(topic)
- self.offsets = {}
-
- if not partitions:
- partitions = self.client.get_partition_ids_for_topic(topic)
- else:
- assert all(isinstance(x, numbers.Integral) for x in partitions)
-
- # Variables for handling offset commits
- self.commit_lock = Lock()
- self.commit_timer = None
- self.count_since_commit = 0
- self.auto_commit = auto_commit
- self.auto_commit_every_n = auto_commit_every_n
- self.auto_commit_every_t = auto_commit_every_t
-
- # Set up the auto-commit timer
- if auto_commit is True and auto_commit_every_t is not None:
- self.commit_timer = ReentrantTimer(auto_commit_every_t,
- self.commit)
- self.commit_timer.start()
-
- if auto_commit:
- self.fetch_last_known_offsets(partitions)
- else:
- for partition in partitions:
- self.offsets[partition] = 0
-
- def fetch_last_known_offsets(self, partitions=None):
- if not partitions:
- partitions = self.client.get_partition_ids_for_topic(self.topic)
-
- def get_or_init_offset(resp):
- try:
- kafka.common.check_error(resp)
- return resp.offset
- except UnknownTopicOrPartitionError:
- return 0
-
- for partition in partitions:
- req = OffsetFetchRequest(self.topic, partition)
- (resp,) = self.client.send_offset_fetch_request(self.group, [req],
- fail_on_error=False)
- self.offsets[partition] = get_or_init_offset(resp)
- self.fetch_offsets = self.offsets.copy()
-
- def commit(self, partitions=None):
- """
- Commit offsets for this consumer
-
- partitions: list of partitions to commit, default is to commit
- all of them
- """
-
- # short circuit if nothing happened. This check is kept outside
- # to prevent un-necessarily acquiring a lock for checking the state
- if self.count_since_commit == 0:
- return
-
- with self.commit_lock:
- # Do this check again, just in case the state has changed
- # during the lock acquiring timeout
- if self.count_since_commit == 0:
- return
-
- reqs = []
- if not partitions: # commit all partitions
- partitions = self.offsets.keys()
-
- for partition in partitions:
- offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: "
- "group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
-
- reqs.append(OffsetCommitRequest(self.topic, partition,
- offset, None))
-
- resps = self.client.send_offset_commit_request(self.group, reqs)
- for resp in resps:
- kafka.common.check_error(resp)
-
- self.count_since_commit = 0
-
- def _auto_commit(self):
- """
- Check if we have to commit based on number of messages and commit
- """
-
- # Check if we are supposed to do an auto-commit
- if not self.auto_commit or self.auto_commit_every_n is None:
- return
-
- if self.count_since_commit >= self.auto_commit_every_n:
- self.commit()
-
- def stop(self):
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
-
- def pending(self, partitions=None):
- """
- Gets the pending message count
-
- partitions: list of partitions to check for, default is to check all
- """
- if not partitions:
- partitions = self.offsets.keys()
-
- total = 0
- reqs = []
-
- for partition in partitions:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
-
- resps = self.client.send_offset_request(reqs)
- for resp in resps:
- partition = resp.partition
- pending = resp.offsets[0]
- offset = self.offsets[partition]
- total += pending - offset - (1 if offset > 0 else 0)
-
- return total
-
-
-class SimpleConsumer(Consumer):
- """
- A simple consumer implementation that consumes all/specified partitions
- for a topic
-
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
- partitions: An optional list of partitions to consume the data from
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- fetch_size_bytes: number of bytes to request in a FetchRequest
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no
- timeout, so it will wait forever.
-
- Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will
- reset one another when one is triggered. These triggers simply call the
- commit method on this class. A manual call to commit will also reset
- these triggers
- """
- def __init__(self, client, group, topic, auto_commit=True, partitions=None,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES,
- buffer_size=FETCH_BUFFER_SIZE_BYTES,
- max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None):
- super(SimpleConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
- if max_buffer_size is not None and buffer_size > max_buffer_size:
- raise ValueError("buffer_size (%d) is greater than "
- "max_buffer_size (%d)" %
- (buffer_size, max_buffer_size))
- self.buffer_size = buffer_size
- self.max_buffer_size = max_buffer_size
- self.partition_info = False # Do not return partition info in msgs
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_offsets = self.offsets.copy()
- self.iter_timeout = iter_timeout
- self.queue = Queue()
-
- def __repr__(self):
- return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
- (self.group, self.topic, str(self.offsets.keys()))
-
- def provide_partition_info(self):
- """
- Indicates that partition info must be returned by the consumer
- """
- self.partition_info = True
-
- def seek(self, offset, whence):
- """
- Alter the current offset in the consumer, similar to fseek
-
- offset: how much to modify the offset
- whence: where to modify it from
- 0 is relative to the earliest available offset (head)
- 1 is relative to the current offset
- 2 is relative to the latest known offset (tail)
- """
-
- if whence == 1: # relative to current position
- for partition, _offset in self.offsets.items():
- self.offsets[partition] = _offset + offset
- elif whence in (0, 2): # relative to beginning or end
- # divide the request offset by number of partitions,
- # distribute the remained evenly
- (delta, rem) = divmod(offset, len(self.offsets))
- deltas = {}
- for partition, r in izip_longest(self.offsets.keys(),
- repeat(1, rem), fillvalue=0):
- deltas[partition] = delta + r
-
- reqs = []
- for partition in self.offsets.keys():
- if whence == 0:
- reqs.append(OffsetRequest(self.topic, partition, -2, 1))
- elif whence == 2:
- reqs.append(OffsetRequest(self.topic, partition, -1, 1))
- else:
- pass
-
- resps = self.client.send_offset_request(reqs)
- for resp in resps:
- self.offsets[resp.partition] = \
- resp.offsets[0] + deltas[resp.partition]
- else:
- raise ValueError("Unexpected value for `whence`, %d" % whence)
-
- # Reset queue and fetch offsets since they are invalid
- self.fetch_offsets = self.offsets.copy()
- if self.auto_commit:
- self.count_since_commit += 1
- self.commit()
-
- self.queue = Queue()
-
- def get_messages(self, count=1, block=True, timeout=0.1):
- """
- Fetch the specified number of messages
-
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
- """
- messages = []
- if timeout is not None:
- max_time = time.time() + timeout
-
- new_offsets = {}
- while count > 0 and (timeout is None or timeout > 0):
- result = self._get_message(block, timeout, get_partition_info=True,
- update_offset=False)
- if result:
- partition, message = result
- if self.partition_info:
- messages.append(result)
- else:
- messages.append(message)
- new_offsets[partition] = message.offset + 1
- count -= 1
- else:
- # Ran out of messages for the last request.
- if not block:
- # If we're not blocking, break.
- break
- if timeout is not None:
- # If we're blocking and have a timeout, reduce it to the
- # appropriate value
- timeout = max_time - time.time()
-
- # Update and commit offsets if necessary
- self.offsets.update(new_offsets)
- self.count_since_commit += len(messages)
- self._auto_commit()
- return messages
-
- def get_message(self, block=True, timeout=0.1, get_partition_info=None):
- return self._get_message(block, timeout, get_partition_info)
-
- def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
- update_offset=True):
- """
- If no messages can be fetched, returns None.
- If get_partition_info is None, it defaults to self.partition_info
- If get_partition_info is True, returns (partition, message)
- If get_partition_info is False, returns message
- """
- if self.queue.empty():
- # We're out of messages, go grab some more.
- with FetchContext(self, block, timeout):
- self._fetch()
- try:
- partition, message = self.queue.get_nowait()
-
- if update_offset:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- if get_partition_info is None:
- get_partition_info = self.partition_info
- if get_partition_info:
- return partition, message
- else:
- return message
- except Empty:
- return None
-
- def __iter__(self):
- if self.iter_timeout is None:
- timeout = ITER_TIMEOUT_SECONDS
- else:
- timeout = self.iter_timeout
-
- while True:
- message = self.get_message(True, timeout)
- if message:
- yield message
- elif self.iter_timeout is None:
- # We did not receive any message yet but we don't have a
- # timeout, so give up the CPU for a while before trying again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
- else:
- # Timed out waiting for a message
- break
-
- def _fetch(self):
- # Create fetch request payloads for all the partitions
- partitions = dict((p, self.buffer_size)
- for p in self.fetch_offsets.keys())
- while partitions:
- requests = []
- for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequest(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
- # Send request
- responses = self.client.send_fetch_request(
- requests,
- max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes)
-
- retry_partitions = {}
- for resp in responses:
- partition = resp.partition
- buffer_size = partitions[partition]
- try:
- for message in resp.messages:
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall:
- if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size):
- log.error("Max fetch size %d too small",
- self.max_buffer_size)
- raise
- if self.max_buffer_size is None:
- buffer_size *= 2
- else:
- buffer_size = max(buffer_size * 2,
- self.max_buffer_size)
- log.warn("Fetch size too small, increase to %d (2x) "
- "and retry", buffer_size)
- retry_partitions[partition] = buffer_size
- except ConsumerNoMoreData as e:
- log.debug("Iteration was ended by %r", e)
- except StopIteration:
- # Stop iterating through this partition
- log.debug("Done iterating over partition %s" % partition)
- partitions = retry_partitions
-
-def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
- """
- A child process worker which consumes messages based on the
- notifications given by the controller process
-
- NOTE: Ideally, this should have been a method inside the Consumer
- class. However, multiprocessing module has issues in windows. The
- functionality breaks unless this function is kept outside of a class
- """
-
- # Make the child processes open separate socket connections
- client.reinit()
-
- # We will start consumers without auto-commit. Auto-commit will be
- # done by the master controller process.
- consumer = SimpleConsumer(client, group, topic,
- partitions=chunk,
- auto_commit=False,
- auto_commit_every_n=None,
- auto_commit_every_t=None)
-
- # Ensure that the consumer provides the partition information
- consumer.provide_partition_info()
-
- while True:
- # Wait till the controller indicates us to start consumption
- start.wait()
-
- # If we are asked to quit, do so
- if exit.is_set():
- break
-
- # Consume messages and add them to the queue. If the controller
- # indicates a specific number of messages, follow that advice
- count = 0
-
- message = consumer.get_message()
- if message:
- queue.put(message)
- count += 1
-
- # We have reached the required size. The controller might have
- # more than what he needs. Wait for a while.
- # Without this logic, it is possible that we run into a big
- # loop consuming all available messages before the controller
- # can reset the 'start' event
- if count == size.value:
- pause.wait()
-
- else:
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
-
- consumer.stop()
-
-
-class MultiProcessConsumer(Consumer):
- """
- A consumer implementation that consumes partitions for a topic in
- parallel using multiple processes
-
- client: a connected KafkaClient
- group: a name for this consumer, used for offset storage and must be unique
- topic: the topic to consume
-
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
-
- Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will
- reset one another when one is triggered. These triggers simply call the
- commit method on this class. A manual call to commit will also reset
- these triggers
- """
- def __init__(self, client, group, topic, auto_commit=True,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- num_procs=1, partitions_per_proc=0):
-
- # Initiate the base consumer class
- super(MultiProcessConsumer, self).__init__(
- client, group, topic,
- partitions=None,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
- # Variables for managing and controlling the data flow from
- # consumer child process to master
- self.queue = MPQueue(1024) # Child consumers dump messages into this
- self.start = Event() # Indicates the consumers to start fetch
- self.exit = Event() # Requests the consumers to shutdown
- self.pause = Event() # Requests the consumers to pause fetch
- self.size = Value('i', 0) # Indicator of number of messages to fetch
-
- partitions = self.offsets.keys()
-
- # If unspecified, start one consumer per partition
- # The logic below ensures that
- # * we do not cross the num_procs limit
- # * we have an even distribution of partitions among processes
- if not partitions_per_proc:
- partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
- if partitions_per_proc < num_procs * 0.5:
- partitions_per_proc += 1
-
- # The final set of chunks
- chunker = lambda *x: [] + list(x)
- chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
-
- self.procs = []
- for chunk in chunks:
- chunk = filter(lambda x: x is not None, chunk)
- args = (client.copy(),
- group, topic, list(chunk),
- self.queue, self.start, self.exit,
- self.pause, self.size)
-
- proc = Process(target=_mp_consume, args=args)
- proc.daemon = True
- proc.start()
- self.procs.append(proc)
-
- def __repr__(self):
- return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
- (self.group, self.topic, len(self.procs))
-
- def stop(self):
- # Set exit and start off all waiting consumers
- self.exit.set()
- self.pause.set()
- self.start.set()
-
- for proc in self.procs:
- proc.join()
- proc.terminate()
-
- super(MultiProcessConsumer, self).stop()
-
- def __iter__(self):
- """
- Iterator to consume the messages available on this consumer
- """
- # Trigger the consumer procs to start off.
- # We will iterate till there are no more messages available
- self.size.value = 0
- self.pause.set()
-
- while True:
- self.start.set()
- try:
- # We will block for a small while so that the consumers get
- # a chance to run and put some messages in the queue
- # TODO: This is a hack and will make the consumer block for
- # at least one second. Need to find a better way of doing this
- partition, message = self.queue.get(block=True, timeout=1)
- except Empty:
- break
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset + 1
- self.start.clear()
- self.count_since_commit += 1
- self._auto_commit()
- yield message
-
- self.start.clear()
-
- def get_messages(self, count=1, block=True, timeout=10):
- """
- Fetch the specified number of messages
-
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till some messages are fetched.
- timeout: If block is True, the function will block for the specified
- time (in seconds) until count messages is fetched. If None,
- it will block forever.
- """
- messages = []
-
- # Give a size hint to the consumers. Each consumer process will fetch
- # a maximum of "count" messages. This will fetch more messages than
- # necessary, but these will not be committed to kafka. Also, the extra
- # messages can be provided in subsequent runs
- self.size.value = count
- self.pause.clear()
-
- if timeout is not None:
- max_time = time.time() + timeout
-
- new_offsets = {}
- while count > 0 and (timeout is None or timeout > 0):
- # Trigger consumption only if the queue is empty
- # By doing this, we will ensure that consumers do not
- # go into overdrive and keep consuming thousands of
- # messages when the user might need only a few
- if self.queue.empty():
- self.start.set()
-
- try:
- partition, message = self.queue.get(block, timeout)
- except Empty:
- break
-
- messages.append(message)
- new_offsets[partition] = message.offset + 1
- count -= 1
- if timeout is not None:
- timeout = max_time - time.time()
-
- self.size.value = 0
- self.start.clear()
- self.pause.set()
-
- # Update and commit offsets if necessary
- self.offsets.update(new_offsets)
- self.count_since_commit += len(messages)
- self._auto_commit()
-
- return messages
diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py
new file mode 100644
index 0000000..d2fa306
--- /dev/null
+++ b/kafka/consumer/__init__.py
@@ -0,0 +1,6 @@
+from .simple import SimpleConsumer
+from .multiprocess import MultiProcessConsumer
+
+__all__ = [
+ 'SimpleConsumer', 'MultiProcessConsumer'
+]
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
new file mode 100644
index 0000000..506b405
--- /dev/null
+++ b/kafka/consumer/base.py
@@ -0,0 +1,169 @@
+from __future__ import absolute_import
+
+import logging
+import numbers
+from threading import Lock
+
+import kafka.common
+from kafka.common import (
+ OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ UnknownTopicOrPartitionError
+)
+
+from kafka.util import ReentrantTimer
+
+log = logging.getLogger("kafka")
+
+AUTO_COMMIT_MSG_COUNT = 100
+AUTO_COMMIT_INTERVAL = 5000
+
+FETCH_DEFAULT_BLOCK_TIMEOUT = 1
+FETCH_MAX_WAIT_TIME = 100
+FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
+
+
+class Consumer(object):
+ """
+ Base class to be used by other consumers. Not to be used directly
+
+ This base class provides logic for
+ * initialization and fetching metadata of partitions
+ * Auto-commit logic
+ * APIs for fetching pending message count
+ """
+ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL):
+
+ self.client = client
+ self.topic = topic
+ self.group = group
+ self.client.load_metadata_for_topics(topic)
+ self.offsets = {}
+
+ if not partitions:
+ partitions = self.client.get_partition_ids_for_topic(topic)
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
+
+ # Variables for handling offset commits
+ self.commit_lock = Lock()
+ self.commit_timer = None
+ self.count_since_commit = 0
+ self.auto_commit = auto_commit
+ self.auto_commit_every_n = auto_commit_every_n
+ self.auto_commit_every_t = auto_commit_every_t
+
+ # Set up the auto-commit timer
+ if auto_commit is True and auto_commit_every_t is not None:
+ self.commit_timer = ReentrantTimer(auto_commit_every_t,
+ self.commit)
+ self.commit_timer.start()
+
+ if auto_commit:
+ self.fetch_last_known_offsets(partitions)
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
+
+ def fetch_last_known_offsets(self, partitions=None):
+ if not partitions:
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+
+ def get_or_init_offset(resp):
+ try:
+ kafka.common.check_error(resp)
+ return resp.offset
+ except UnknownTopicOrPartitionError:
+ return 0
+
+ for partition in partitions:
+ req = OffsetFetchRequest(self.topic, partition)
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
+ fail_on_error=False)
+ self.offsets[partition] = get_or_init_offset(resp)
+ self.fetch_offsets = self.offsets.copy()
+
+ def commit(self, partitions=None):
+ """
+ Commit offsets for this consumer
+
+ partitions: list of partitions to commit, default is to commit
+ all of them
+ """
+
+ # short circuit if nothing happened. This check is kept outside
+ # to prevent un-necessarily acquiring a lock for checking the state
+ if self.count_since_commit == 0:
+ return
+
+ with self.commit_lock:
+ # Do this check again, just in case the state has changed
+ # during the lock acquiring timeout
+ if self.count_since_commit == 0:
+ return
+
+ reqs = []
+ if not partitions: # commit all partitions
+ partitions = self.offsets.keys()
+
+ for partition in partitions:
+ offset = self.offsets[partition]
+ log.debug("Commit offset %d in SimpleConsumer: "
+ "group=%s, topic=%s, partition=%s" %
+ (offset, self.group, self.topic, partition))
+
+ reqs.append(OffsetCommitRequest(self.topic, partition,
+ offset, None))
+
+ resps = self.client.send_offset_commit_request(self.group, reqs)
+ for resp in resps:
+ kafka.common.check_error(resp)
+
+ self.count_since_commit = 0
+
+ def _auto_commit(self):
+ """
+ Check if we have to commit based on number of messages and commit
+ """
+
+ # Check if we are supposed to do an auto-commit
+ if not self.auto_commit or self.auto_commit_every_n is None:
+ return
+
+ if self.count_since_commit >= self.auto_commit_every_n:
+ self.commit()
+
+ def stop(self):
+ if self.commit_timer is not None:
+ self.commit_timer.stop()
+ self.commit()
+
+ def pending(self, partitions=None):
+ """
+ Gets the pending message count
+
+ partitions: list of partitions to check for, default is to check all
+ """
+ if not partitions:
+ partitions = self.offsets.keys()
+
+ total = 0
+ reqs = []
+
+ for partition in partitions:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ partition = resp.partition
+ pending = resp.offsets[0]
+ offset = self.offsets[partition]
+ total += pending - offset - (1 if offset > 0 else 0)
+
+ return total
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
new file mode 100644
index 0000000..912e64b
--- /dev/null
+++ b/kafka/consumer/multiprocess.py
@@ -0,0 +1,248 @@
+from __future__ import absolute_import
+
+import logging
+import time
+from multiprocessing import Process, Queue as MPQueue, Event, Value
+
+try:
+ from Queue import Empty
+except ImportError: # python 2
+ from queue import Empty
+
+from .base import (
+ AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
+ NO_MESSAGES_WAIT_TIME_SECONDS
+)
+from .simple import Consumer, SimpleConsumer
+
+log = logging.getLogger("kafka")
+
+
+def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
+ """
+ A child process worker which consumes messages based on the
+ notifications given by the controller process
+
+ NOTE: Ideally, this should have been a method inside the Consumer
+ class. However, multiprocessing module has issues in windows. The
+ functionality breaks unless this function is kept outside of a class
+ """
+
+ # Make the child processes open separate socket connections
+ client.reinit()
+
+ # We will start consumers without auto-commit. Auto-commit will be
+ # done by the master controller process.
+ consumer = SimpleConsumer(client, group, topic,
+ partitions=chunk,
+ auto_commit=False,
+ auto_commit_every_n=None,
+ auto_commit_every_t=None)
+
+ # Ensure that the consumer provides the partition information
+ consumer.provide_partition_info()
+
+ while True:
+ # Wait till the controller indicates us to start consumption
+ start.wait()
+
+ # If we are asked to quit, do so
+ if exit.is_set():
+ break
+
+ # Consume messages and add them to the queue. If the controller
+ # indicates a specific number of messages, follow that advice
+ count = 0
+
+ message = consumer.get_message()
+ if message:
+ queue.put(message)
+ count += 1
+
+ # We have reached the required size. The controller might have
+ # more than what he needs. Wait for a while.
+ # Without this logic, it is possible that we run into a big
+ # loop consuming all available messages before the controller
+ # can reset the 'start' event
+ if count == size.value:
+ pause.wait()
+
+ else:
+ # In case we did not receive any message, give up the CPU for
+ # a while before we try again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+
+ consumer.stop()
+
+
+class MultiProcessConsumer(Consumer):
+ """
+ A consumer implementation that consumes partitions for a topic in
+ parallel using multiple processes
+
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ num_procs: Number of processes to start for consuming messages.
+ The available partitions will be divided among these processes
+ partitions_per_proc: Number of partitions to be allocated per process
+ (overrides num_procs)
+
+ Auto commit details:
+ If both auto_commit_every_n and auto_commit_every_t are set, they will
+ reset one another when one is triggered. These triggers simply call the
+ commit method on this class. A manual call to commit will also reset
+ these triggers
+ """
+ def __init__(self, client, group, topic, auto_commit=True,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+ num_procs=1, partitions_per_proc=0):
+
+ # Initiate the base consumer class
+ super(MultiProcessConsumer, self).__init__(
+ client, group, topic,
+ partitions=None,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
+
+ # Variables for managing and controlling the data flow from
+ # consumer child process to master
+ self.queue = MPQueue(1024) # Child consumers dump messages into this
+ self.start = Event() # Indicates the consumers to start fetch
+ self.exit = Event() # Requests the consumers to shutdown
+ self.pause = Event() # Requests the consumers to pause fetch
+ self.size = Value('i', 0) # Indicator of number of messages to fetch
+
+ partitions = self.offsets.keys()
+
+ # If unspecified, start one consumer per partition
+ # The logic below ensures that
+ # * we do not cross the num_procs limit
+ # * we have an even distribution of partitions among processes
+ if not partitions_per_proc:
+ partitions_per_proc = round(len(partitions) * 1.0 / num_procs)
+ if partitions_per_proc < num_procs * 0.5:
+ partitions_per_proc += 1
+
+ # The final set of chunks
+ chunker = lambda *x: [] + list(x)
+ chunks = map(chunker, *[iter(partitions)] * int(partitions_per_proc))
+
+ self.procs = []
+ for chunk in chunks:
+ chunk = filter(lambda x: x is not None, chunk)
+ args = (client.copy(),
+ group, topic, list(chunk),
+ self.queue, self.start, self.exit,
+ self.pause, self.size)
+
+ proc = Process(target=_mp_consume, args=args)
+ proc.daemon = True
+ proc.start()
+ self.procs.append(proc)
+
+ def __repr__(self):
+ return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
+ (self.group, self.topic, len(self.procs))
+
+ def stop(self):
+ # Set exit and start off all waiting consumers
+ self.exit.set()
+ self.pause.set()
+ self.start.set()
+
+ for proc in self.procs:
+ proc.join()
+ proc.terminate()
+
+ super(MultiProcessConsumer, self).stop()
+
+ def __iter__(self):
+ """
+ Iterator to consume the messages available on this consumer
+ """
+ # Trigger the consumer procs to start off.
+ # We will iterate till there are no more messages available
+ self.size.value = 0
+ self.pause.set()
+
+ while True:
+ self.start.set()
+ try:
+ # We will block for a small while so that the consumers get
+ # a chance to run and put some messages in the queue
+ # TODO: This is a hack and will make the consumer block for
+ # at least one second. Need to find a better way of doing this
+ partition, message = self.queue.get(block=True, timeout=1)
+ except Empty:
+ break
+
+ # Count, check and commit messages if necessary
+ self.offsets[partition] = message.offset + 1
+ self.start.clear()
+ self.count_since_commit += 1
+ self._auto_commit()
+ yield message
+
+ self.start.clear()
+
+ def get_messages(self, count=1, block=True, timeout=10):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
+ """
+ messages = []
+
+ # Give a size hint to the consumers. Each consumer process will fetch
+ # a maximum of "count" messages. This will fetch more messages than
+ # necessary, but these will not be committed to kafka. Also, the extra
+ # messages can be provided in subsequent runs
+ self.size.value = count
+ self.pause.clear()
+
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ # Trigger consumption only if the queue is empty
+ # By doing this, we will ensure that consumers do not
+ # go into overdrive and keep consuming thousands of
+ # messages when the user might need only a few
+ if self.queue.empty():
+ self.start.set()
+
+ try:
+ partition, message = self.queue.get(block, timeout)
+ except Empty:
+ break
+
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
+ count -= 1
+ if timeout is not None:
+ timeout = max_time - time.time()
+
+ self.size.value = 0
+ self.start.clear()
+ self.pause.set()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
+ return messages
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
new file mode 100644
index 0000000..dcc71a9
--- /dev/null
+++ b/kafka/consumer/simple.py
@@ -0,0 +1,318 @@
+from __future__ import absolute_import
+
+try:
+ from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
+except ImportError: # python 2
+ from itertools import izip_longest as izip_longest, repeat
+import logging
+import time
+
+import six
+
+try:
+ from Queue import Empty, Queue
+except ImportError: # python 2
+ from queue import Empty, Queue
+
+from kafka.common import (
+ FetchRequest, OffsetRequest,
+ ConsumerFetchSizeTooSmall, ConsumerNoMoreData
+)
+from .base import (
+ Consumer,
+ FETCH_DEFAULT_BLOCK_TIMEOUT,
+ AUTO_COMMIT_MSG_COUNT,
+ AUTO_COMMIT_INTERVAL,
+ FETCH_MIN_BYTES,
+ FETCH_BUFFER_SIZE_BYTES,
+ MAX_FETCH_BUFFER_SIZE_BYTES,
+ FETCH_MAX_WAIT_TIME,
+ ITER_TIMEOUT_SECONDS,
+ NO_MESSAGES_WAIT_TIME_SECONDS
+)
+
+log = logging.getLogger("kafka")
+
+class FetchContext(object):
+ """
+ Class for managing the state of a consumer during fetch
+ """
+ def __init__(self, consumer, block, timeout):
+ self.consumer = consumer
+ self.block = block
+
+ if block:
+ if not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+ self.timeout = timeout * 1000
+
+ def __enter__(self):
+ """Set fetch values based on blocking status"""
+ self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
+ self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
+ if self.block:
+ self.consumer.fetch_max_wait_time = self.timeout
+ self.consumer.fetch_min_bytes = 1
+ else:
+ self.consumer.fetch_min_bytes = 0
+
+ def __exit__(self, type, value, traceback):
+ """Reset values"""
+ self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
+ self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
+
+
+class SimpleConsumer(Consumer):
+ """
+ A simple consumer implementation that consumes all/specified partitions
+ for a topic
+
+ client: a connected KafkaClient
+ group: a name for this consumer, used for offset storage and must be unique
+ topic: the topic to consume
+ partitions: An optional list of partitions to consume the data from
+
+ auto_commit: default True. Whether or not to auto commit the offsets
+ auto_commit_every_n: default 100. How many messages to consume
+ before a commit
+ auto_commit_every_t: default 5000. How much time (in milliseconds) to
+ wait before commit
+ fetch_size_bytes: number of bytes to request in a FetchRequest
+ buffer_size: default 4K. Initial number of bytes to tell kafka we
+ have available. This will double as needed.
+ max_buffer_size: default 16K. Max number of bytes to tell kafka we have
+ available. None means no limit.
+ iter_timeout: default None. How much time (in seconds) to wait for a
+ message in the iterator before exiting. None means no
+ timeout, so it will wait forever.
+
+ Auto commit details:
+ If both auto_commit_every_n and auto_commit_every_t are set, they will
+ reset one another when one is triggered. These triggers simply call the
+ commit method on this class. A manual call to commit will also reset
+ these triggers
+ """
+ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL,
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES,
+ max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
+ iter_timeout=None):
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
+
+ if max_buffer_size is not None and buffer_size > max_buffer_size:
+ raise ValueError("buffer_size (%d) is greater than "
+ "max_buffer_size (%d)" %
+ (buffer_size, max_buffer_size))
+ self.buffer_size = buffer_size
+ self.max_buffer_size = max_buffer_size
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = fetch_size_bytes
+ self.fetch_offsets = self.offsets.copy()
+ self.iter_timeout = iter_timeout
+ self.queue = Queue()
+
+ def __repr__(self):
+ return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
+ (self.group, self.topic, str(self.offsets.keys()))
+
+ def provide_partition_info(self):
+ """
+ Indicates that partition info must be returned by the consumer
+ """
+ self.partition_info = True
+
+ def seek(self, offset, whence):
+ """
+ Alter the current offset in the consumer, similar to fseek
+
+ offset: how much to modify the offset
+ whence: where to modify it from
+ 0 is relative to the earliest available offset (head)
+ 1 is relative to the current offset
+ 2 is relative to the latest known offset (tail)
+ """
+
+ if whence == 1: # relative to current position
+ for partition, _offset in self.offsets.items():
+ self.offsets[partition] = _offset + offset
+ elif whence in (0, 2): # relative to beginning or end
+ # divide the request offset by number of partitions,
+ # distribute the remained evenly
+ (delta, rem) = divmod(offset, len(self.offsets))
+ deltas = {}
+ for partition, r in izip_longest(self.offsets.keys(),
+ repeat(1, rem), fillvalue=0):
+ deltas[partition] = delta + r
+
+ reqs = []
+ for partition in self.offsets.keys():
+ if whence == 0:
+ reqs.append(OffsetRequest(self.topic, partition, -2, 1))
+ elif whence == 2:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+ else:
+ pass
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ self.offsets[resp.partition] = \
+ resp.offsets[0] + deltas[resp.partition]
+ else:
+ raise ValueError("Unexpected value for `whence`, %d" % whence)
+
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ if self.auto_commit:
+ self.count_since_commit += 1
+ self.commit()
+
+ self.queue = Queue()
+
+ def get_messages(self, count=1, block=True, timeout=0.1):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If block is True, the function will block for the specified
+ time (in seconds) until count messages is fetched. If None,
+ it will block forever.
+ """
+ messages = []
+ if timeout is not None:
+ max_time = time.time() + timeout
+
+ new_offsets = {}
+ while count > 0 and (timeout is None or timeout > 0):
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
+ count -= 1
+ else:
+ # Ran out of messages for the last request.
+ if not block:
+ # If we're not blocking, break.
+ break
+ if timeout is not None:
+ # If we're blocking and have a timeout, reduce it to the
+ # appropriate value
+ timeout = max_time - time.time()
+
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+ return messages
+
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
+ """
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
+ """
+ if self.queue.empty():
+ # We're out of messages, go grab some more.
+ with FetchContext(self, block, timeout):
+ self._fetch()
+ try:
+ partition, message = self.queue.get_nowait()
+
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
+ except Empty:
+ return None
+
+ def __iter__(self):
+ if self.iter_timeout is None:
+ timeout = ITER_TIMEOUT_SECONDS
+ else:
+ timeout = self.iter_timeout
+
+ while True:
+ message = self.get_message(True, timeout)
+ if message:
+ yield message
+ elif self.iter_timeout is None:
+ # We did not receive any message yet but we don't have a
+ # timeout, so give up the CPU for a while before trying again
+ time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
+ else:
+ # Timed out waiting for a message
+ break
+
+ def _fetch(self):
+ # Create fetch request payloads for all the partitions
+ partitions = dict((p, self.buffer_size)
+ for p in self.fetch_offsets.keys())
+ while partitions:
+ requests = []
+ for partition, buffer_size in six.iteritems(partitions):
+ requests.append(FetchRequest(self.topic, partition,
+ self.fetch_offsets[partition],
+ buffer_size))
+ # Send request
+ responses = self.client.send_fetch_request(
+ requests,
+ max_wait_time=int(self.fetch_max_wait_time),
+ min_bytes=self.fetch_min_bytes)
+
+ retry_partitions = {}
+ for resp in responses:
+ partition = resp.partition
+ buffer_size = partitions[partition]
+ try:
+ for message in resp.messages:
+ # Put the message in our queue
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
+ except ConsumerFetchSizeTooSmall:
+ if (self.max_buffer_size is not None and
+ buffer_size == self.max_buffer_size):
+ log.error("Max fetch size %d too small",
+ self.max_buffer_size)
+ raise
+ if self.max_buffer_size is None:
+ buffer_size *= 2
+ else:
+ buffer_size = max(buffer_size * 2,
+ self.max_buffer_size)
+ log.warn("Fetch size too small, increase to %d (2x) "
+ "and retry", buffer_size)
+ retry_partitions[partition] = buffer_size
+ except ConsumerNoMoreData as e:
+ log.debug("Iteration was ended by %r", e)
+ except StopIteration:
+ # Stop iterating through this partition
+ log.debug("Done iterating over partition %s" % partition)
+ partitions = retry_partitions
diff --git a/kafka/partitioner.py b/kafka/partitioner.py
deleted file mode 100644
index 695dd6f..0000000
--- a/kafka/partitioner.py
+++ /dev/null
@@ -1,58 +0,0 @@
-from itertools import cycle
-
-
-class Partitioner(object):
- """
- Base class for a partitioner
- """
- def __init__(self, partitions):
- """
- Initialize the partitioner
-
- partitions - A list of available partitions (during startup)
- """
- self.partitions = partitions
-
- def partition(self, key, partitions):
- """
- Takes a string key and num_partitions as argument and returns
- a partition to be used for the message
-
- partitions - The list of partitions is passed in every call. This
- may look like an overhead, but it will be useful
- (in future) when we handle cases like rebalancing
- """
- raise NotImplementedError('partition function has to be implemented')
-
-
-class RoundRobinPartitioner(Partitioner):
- """
- Implements a round robin partitioner which sends data to partitions
- in a round robin fashion
- """
- def __init__(self, partitions):
- super(RoundRobinPartitioner, self).__init__(partitions)
- self.iterpart = cycle(partitions)
-
- def _set_partitions(self, partitions):
- self.partitions = partitions
- self.iterpart = cycle(partitions)
-
- def partition(self, key, partitions):
- # Refresh the partition list if necessary
- if self.partitions != partitions:
- self._set_partitions(partitions)
-
- return next(self.iterpart)
-
-
-class HashedPartitioner(Partitioner):
- """
- Implements a partitioner which selects the target partition based on
- the hash of the key
- """
- def partition(self, key, partitions):
- size = len(partitions)
- idx = hash(key) % size
-
- return partitions[idx]
diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py
new file mode 100644
index 0000000..fdb19bb
--- /dev/null
+++ b/kafka/partitioner/__init__.py
@@ -0,0 +1,6 @@
+from .roundrobin import RoundRobinPartitioner
+from .hashed import HashedPartitioner
+
+__all__ = [
+ 'RoundRobinPartitioner', 'HashedPartitioner'
+]
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py
new file mode 100644
index 0000000..c62b7ed
--- /dev/null
+++ b/kafka/partitioner/base.py
@@ -0,0 +1,23 @@
+
+class Partitioner(object):
+ """
+ Base class for a partitioner
+ """
+ def __init__(self, partitions):
+ """
+ Initialize the partitioner
+
+ partitions - A list of available partitions (during startup)
+ """
+ self.partitions = partitions
+
+ def partition(self, key, partitions):
+ """
+ Takes a string key and num_partitions as argument and returns
+ a partition to be used for the message
+
+ partitions - The list of partitions is passed in every call. This
+ may look like an overhead, but it will be useful
+ (in future) when we handle cases like rebalancing
+ """
+ raise NotImplementedError('partition function has to be implemented')
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
new file mode 100644
index 0000000..587a3de
--- /dev/null
+++ b/kafka/partitioner/hashed.py
@@ -0,0 +1,12 @@
+from .base import Partitioner
+
+class HashedPartitioner(Partitioner):
+ """
+ Implements a partitioner which selects the target partition based on
+ the hash of the key
+ """
+ def partition(self, key, partitions):
+ size = len(partitions)
+ idx = hash(key) % size
+
+ return partitions[idx]
diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py
new file mode 100644
index 0000000..54d00da
--- /dev/null
+++ b/kafka/partitioner/roundrobin.py
@@ -0,0 +1,23 @@
+from itertools import cycle
+
+from .base import Partitioner
+
+class RoundRobinPartitioner(Partitioner):
+ """
+ Implements a round robin partitioner which sends data to partitions
+ in a round robin fashion
+ """
+ def __init__(self, partitions):
+ super(RoundRobinPartitioner, self).__init__(partitions)
+ self.iterpart = cycle(partitions)
+
+ def _set_partitions(self, partitions):
+ self.partitions = partitions
+ self.iterpart = cycle(partitions)
+
+ def partition(self, key, partitions):
+ # Refresh the partition list if necessary
+ if self.partitions != partitions:
+ self._set_partitions(partitions)
+
+ return next(self.iterpart)
diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py
new file mode 100644
index 0000000..bc0e7c6
--- /dev/null
+++ b/kafka/producer/__init__.py
@@ -0,0 +1,6 @@
+from .simple import SimpleProducer
+from .keyed import KeyedProducer
+
+__all__ = [
+ 'SimpleProducer', 'KeyedProducer'
+]
diff --git a/kafka/producer.py b/kafka/producer/base.py
index f186649..6c91364 100644
--- a/kafka/producer.py
+++ b/kafka/producer/base.py
@@ -2,23 +2,19 @@ from __future__ import absolute_import
import logging
import time
-import random
try:
from queue import Empty
except ImportError:
from Queue import Empty
from collections import defaultdict
-from itertools import cycle
from multiprocessing import Queue, Process
import six
-from six.moves import xrange
from kafka.common import (
- ProduceRequest, TopicAndPartition, UnsupportedCodecError, UnknownTopicOrPartitionError
+ ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
-from kafka.partitioner import HashedPartitioner
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
log = logging.getLogger("kafka")
@@ -208,112 +204,3 @@ class Producer(object):
if self.proc.is_alive():
self.proc.terminate()
-
-
-class SimpleProducer(Producer):
- """
- A simple, round-robin producer. Each message goes to exactly one partition
-
- Params:
- client - The Kafka client instance to use
- async - If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- req_acks - A value indicating the acknowledgements that the server must
- receive before responding to the request
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
- random_start - If true, randomize the initial partition which the
- the first message block will be published to, otherwise
- if false, the first message block will always publish
- to partition 0 before cycling through each partition
- """
- def __init__(self, client, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- random_start=False):
- self.partition_cycles = {}
- self.random_start = random_start
- super(SimpleProducer, self).__init__(client, async, req_acks,
- ack_timeout, codec, batch_send,
- batch_send_every_n,
- batch_send_every_t)
-
- def _next_partition(self, topic):
- if topic not in self.partition_cycles:
- if not self.client.has_metadata_for_topic(topic):
- self.client.load_metadata_for_topics(topic)
-
- self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
-
- # Randomize the initial partition that is returned
- if self.random_start:
- num_partitions = len(self.client.get_partition_ids_for_topic(topic))
- for _ in xrange(random.randint(0, num_partitions-1)):
- next(self.partition_cycles[topic])
-
- return next(self.partition_cycles[topic])
-
- def send_messages(self, topic, *msg):
- partition = self._next_partition(topic)
- return super(SimpleProducer, self).send_messages(topic, partition, *msg)
-
- def __repr__(self):
- return '<SimpleProducer batch=%s>' % self.async
-
-
-class KeyedProducer(Producer):
- """
- A producer which distributes messages to partitions based on the key
-
- Args:
- client - The kafka client instance
- partitioner - A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner
- async - If True, the messages are sent asynchronously via another
- thread (process). We will not wait for a response to these
- ack_timeout - Value (in milliseconds) indicating a timeout for waiting
- for an acknowledgement
- batch_send - If True, messages are send in batches
- batch_send_every_n - If set, messages are send in batches of this size
- batch_send_every_t - If set, messages are send after this timeout
- """
- def __init__(self, client, partitioner=None, async=False,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
- codec=None,
- batch_send=False,
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- if not partitioner:
- partitioner = HashedPartitioner
- self.partitioner_class = partitioner
- self.partitioners = {}
-
- super(KeyedProducer, self).__init__(client, async, req_acks,
- ack_timeout, codec, batch_send,
- batch_send_every_n,
- batch_send_every_t)
-
- def _next_partition(self, topic, key):
- if topic not in self.partitioners:
- if not self.client.has_metadata_for_topic(topic):
- self.client.load_metadata_for_topics(topic)
-
- self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
-
- partitioner = self.partitioners[topic]
- return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
-
- def send(self, topic, key, msg):
- partition = self._next_partition(topic, key)
- return self.send_messages(topic, partition, msg)
-
- def __repr__(self):
- return '<KeyedProducer batch=%s>' % self.async
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
new file mode 100644
index 0000000..d2311c6
--- /dev/null
+++ b/kafka/producer/keyed.py
@@ -0,0 +1,62 @@
+from __future__ import absolute_import
+
+import logging
+
+from kafka.partitioner import HashedPartitioner
+from .base import (
+ Producer, BATCH_SEND_DEFAULT_INTERVAL,
+ BATCH_SEND_MSG_COUNT
+)
+
+log = logging.getLogger("kafka")
+
+
+class KeyedProducer(Producer):
+ """
+ A producer which distributes messages to partitions based on the key
+
+ Args:
+ client - The kafka client instance
+ partitioner - A partitioner class that will be used to get the partition
+ to send the message to. Must be derived from Partitioner
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
+ """
+ def __init__(self, client, partitioner=None, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ codec=None,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+ if not partitioner:
+ partitioner = HashedPartitioner
+ self.partitioner_class = partitioner
+ self.partitioners = {}
+
+ super(KeyedProducer, self).__init__(client, async, req_acks,
+ ack_timeout, codec, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
+
+ def _next_partition(self, topic, key):
+ if topic not in self.partitioners:
+ if not self.client.has_metadata_for_topic(topic):
+ self.client.load_metadata_for_topics(topic)
+
+ self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
+
+ partitioner = self.partitioners[topic]
+ return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))
+
+ def send(self, topic, key, msg):
+ partition = self._next_partition(topic, key)
+ return self.send_messages(topic, partition, msg)
+
+ def __repr__(self):
+ return '<KeyedProducer batch=%s>' % self.async
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
new file mode 100644
index 0000000..a10fa8c
--- /dev/null
+++ b/kafka/producer/simple.py
@@ -0,0 +1,73 @@
+from __future__ import absolute_import
+
+import logging
+import random
+
+from itertools import cycle
+
+from six.moves import xrange
+
+from .base import (
+ Producer, BATCH_SEND_DEFAULT_INTERVAL,
+ BATCH_SEND_MSG_COUNT
+)
+
+log = logging.getLogger("kafka")
+
+
+class SimpleProducer(Producer):
+ """
+ A simple, round-robin producer. Each message goes to exactly one partition
+
+ Params:
+ client - The Kafka client instance to use
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
+ random_start - If true, randomize the initial partition which the
+ the first message block will be published to, otherwise
+ if false, the first message block will always publish
+ to partition 0 before cycling through each partition
+ """
+ def __init__(self, client, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ codec=None,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
+ random_start=False):
+ self.partition_cycles = {}
+ self.random_start = random_start
+ super(SimpleProducer, self).__init__(client, async, req_acks,
+ ack_timeout, codec, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
+
+ def _next_partition(self, topic):
+ if topic not in self.partition_cycles:
+ if not self.client.has_metadata_for_topic(topic):
+ self.client.load_metadata_for_topics(topic)
+
+ self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
+
+ # Randomize the initial partition that is returned
+ if self.random_start:
+ num_partitions = len(self.client.get_partition_ids_for_topic(topic))
+ for _ in xrange(random.randint(0, num_partitions-1)):
+ next(self.partition_cycles[topic])
+
+ return next(self.partition_cycles[topic])
+
+ def send_messages(self, topic, *msg):
+ partition = self._next_partition(topic)
+ return super(SimpleProducer, self).send_messages(topic, partition, *msg)
+
+ def __repr__(self):
+ return '<SimpleProducer batch=%s>' % self.async
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 6895c0e..717eac5 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -4,7 +4,7 @@ from six.moves import xrange
from kafka import SimpleConsumer, MultiProcessConsumer, create_message
from kafka.common import ProduceRequest, ConsumerFetchSizeTooSmall
-from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
+from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index b572b4e..d307d41 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -6,7 +6,7 @@ from . import unittest
from kafka import KafkaClient, SimpleConsumer
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
-from kafka.producer import Producer
+from kafka.producer.base import Producer
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
diff --git a/test/test_producer.py b/test/test_producer.py
index 1375525..caf8fe3 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -5,7 +5,7 @@ import logging
from mock import MagicMock
from . import unittest
-from kafka.producer import Producer
+from kafka.producer.base import Producer
class TestKafkaProducer(unittest.TestCase):
def test_producer_message_types(self):