summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2014-01-30 17:00:55 -0800
committerDana Powers <dana.powers@gmail.com>2014-01-30 17:00:55 -0800
commit4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb (patch)
treea670e3399b44ff66ea730a4a53513af178cd56e5 /kafka
parentc9d9d0aad2447bb8bad0e62c97365e5101001e4b (diff)
parentf6df696e0ab11ec931283dcca8c518cd54d57687 (diff)
downloadkafka-python-4abf7ee1fbbdc47c8cb7b35f2600e58f1f95e6bb.tar.gz
Merge pull request #111 from rdiomar/multitopic_producers
Make producers take a topic argument at send rather than init time -- fixes Issue #110, but breaks backwards compatibility with previous Producer interface.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/producer.py79
1 files changed, 44 insertions, 35 deletions
diff --git a/kafka/producer.py b/kafka/producer.py
index 6b624f2..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -8,7 +8,7 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message
@@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
-def _send_upstream(topic, queue, client, batch_time, batch_size,
+def _send_upstream(queue, client, batch_time, batch_size,
req_acks, ack_timeout):
"""
Listen on the queue for a specified number of messages or till
@@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
- partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
+
except Empty:
break
# Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
- msgset[partition].append(msg)
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
@@ -78,7 +81,6 @@ class Producer(object):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +121,7 @@ class Producer(object):
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
- args=(self.topic,
- self.queue,
+ args=(self.queue,
self.client.copy(),
batch_send_every_t,
batch_send_every_n,
@@ -131,17 +132,18 @@ class Producer(object):
self.proc.daemon = True
self.proc.start()
- def send_messages(self, partition, *msg):
+ def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
"""
if self.async:
for m in msg:
- self.queue.put((partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
- req = ProduceRequest(self.topic, partition, messages)
+ req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
@@ -169,7 +171,6 @@ class SimpleProducer(Producer):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +181,31 @@ class SimpleProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, async=False,
+ def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client.load_metadata_for_topics(topic)
- self.next_partition = cycle(client.topic_partitions[topic])
-
+ self.partition_cycles = {}
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send_messages(self, *msg):
- partition = self.next_partition.next()
- return super(SimpleProducer, self).send_messages(partition, *msg)
+ def _next_partition(self, topic):
+ if topic not in self.partition_cycles:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ return self.partition_cycles[topic].next()
+
+ def send_messages(self, topic, *msg):
+ partition = self._next_partition(topic)
+ return super(SimpleProducer, self).send_messages(topic, partition, *msg)
def __repr__(self):
- return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async)
+ return '<SimpleProducer batch=%s>' % self.async
class KeyedProducer(Producer):
@@ -209,7 +214,6 @@ class KeyedProducer(Producer):
Args:
client - The kafka client instance
- topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
@@ -220,29 +224,34 @@ class KeyedProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None, async=False,
+ def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client.load_metadata_for_topics(topic)
-
if not partitioner:
partitioner = HashedPartitioner
-
- self.partitioner = partitioner(client.topic_partitions[topic])
+ self.partitioner_class = partitioner
+ self.partitioners = {}
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send(self, key, msg):
- partitions = self.client.topic_partitions[self.topic]
- partition = self.partitioner.partition(key, partitions)
- return self.send_messages(partition, msg)
+ def _next_partition(self, topic, key):
+ if topic not in self.partitioners:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partitioners[topic] = \
+ self.partitioner_class(self.client.topic_partitions[topic])
+ partitioner = self.partitioners[topic]
+ return partitioner.partition(key, self.client.topic_partitions[topic])
+
+ def send(self, topic, key, msg):
+ partition = self._next_partition(topic, key)
+ return self.send_messages(topic, partition, msg)
def __repr__(self):
- return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async)
+ return '<KeyedProducer batch=%s>' % self.async