summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md26
-rw-r--r--kafka/consumer.py109
-rw-r--r--kafka/producer.py79
-rw-r--r--test/test_integration.py82
4 files changed, 168 insertions, 128 deletions
diff --git a/README.md b/README.md
index 9bf3a03..fbccc1e 100644
--- a/README.md
+++ b/README.md
@@ -17,7 +17,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
# Status
-The current version of this package is **0.9.0** and is compatible with
+The current version of this package is **0.9.0** and is compatible with
Kafka brokers running version **0.8.1**.
# Usage
@@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092)
# To send messages synchronously
-producer = SimpleProducer(kafka, "my-topic")
-producer.send_messages("some message")
-producer.send_messages("this method", "is variadic")
+producer = SimpleProducer(kafka)
+producer.send_messages("my-topic", "some message")
+producer.send_messages("my-topic", "this method", "is variadic")
# To send messages asynchronously
-producer = SimpleProducer(kafka, "my-topic", async=True)
-producer.send_messages("async message")
+producer = SimpleProducer(kafka, async=True)
+producer.send_messages("my-topic", "async message")
# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
-producer = SimpleProducer(kafka, "my-topic", async=False,
+producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=2000)
-response = producer.send_messages("async message")
+response = producer.send_messages("my-topic", "async message")
if response:
print(response[0].error)
@@ -62,7 +62,7 @@ if response:
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
-producer = SimpleProducer(kafka, "my-topic", batch_send=True,
+producer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)
@@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
kafka = KafkaClient("localhost", 9092)
# HashedPartitioner is default
-producer = KeyedProducer(kafka, "my-topic")
-producer.send("key1", "some message")
-producer.send("key2", "this methode")
+producer = KeyedProducer(kafka)
+producer.send("my-topic", "key1", "some message")
+producer.send("my-topic", "key2", "this methode")
-producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
+producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
```
## Multiprocess consumer
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 5be1bef..28b53ec 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-from collections import defaultdict
from itertools import izip_longest, repeat
import logging
import time
@@ -235,6 +234,12 @@ class SimpleConsumer(Consumer):
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None):
+ super(SimpleConsumer, self).__init__(
+ client, group, topic,
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError("buffer_size (%d) is greater than "
@@ -245,17 +250,10 @@ class SimpleConsumer(Consumer):
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
- self.fetch_started = defaultdict(bool) # defaults to false
+ self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.queue = Queue()
- super(SimpleConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
@@ -305,6 +303,10 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ # Reset queue and fetch offsets since they are invalid
+ self.fetch_offsets = self.offsets.copy()
+ self.queue = Queue()
+
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
@@ -316,33 +318,69 @@ class SimpleConsumer(Consumer):
it will block forever.
"""
messages = []
- if timeout:
+ if timeout is not None:
max_time = time.time() + timeout
+ new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
- message = self.get_message(block, timeout)
- if message:
- messages.append(message)
+ result = self._get_message(block, timeout, get_partition_info=True,
+ update_offset=False)
+ if result:
+ partition, message = result
+ if self.partition_info:
+ messages.append(result)
+ else:
+ messages.append(message)
+ new_offsets[partition] = message.offset + 1
count -= 1
else:
# Ran out of messages for the last request.
if not block:
# If we're not blocking, break.
break
- if timeout:
+ if timeout is not None:
# If we're blocking and have a timeout, reduce it to the
# appropriate value
timeout = max_time - time.time()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
return messages
- def get_message(self, block=True, timeout=0.1):
+ def get_message(self, block=True, timeout=0.1, get_partition_info=None):
+ return self._get_message(block, timeout, get_partition_info)
+
+ def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
+ update_offset=True):
+ """
+ If no messages can be fetched, returns None.
+ If get_partition_info is None, it defaults to self.partition_info
+ If get_partition_info is True, returns (partition, message)
+ If get_partition_info is False, returns message
+ """
if self.queue.empty():
# We're out of messages, go grab some more.
with FetchContext(self, block, timeout):
self._fetch()
try:
- return self.queue.get_nowait()
+ partition, message = self.queue.get_nowait()
+
+ if update_offset:
+ # Update partition offset
+ self.offsets[partition] = message.offset + 1
+
+ # Count, check and commit messages if necessary
+ self.count_since_commit += 1
+ self._auto_commit()
+
+ if get_partition_info is None:
+ get_partition_info = self.partition_info
+ if get_partition_info:
+ return partition, message
+ else:
+ return message
except Empty:
return None
@@ -367,11 +405,11 @@ class SimpleConsumer(Consumer):
def _fetch(self):
# Create fetch request payloads for all the partitions
requests = []
- partitions = self.offsets.keys()
+ partitions = self.fetch_offsets.keys()
while partitions:
for partition in partitions:
requests.append(FetchRequest(self.topic, partition,
- self.offsets[partition],
+ self.fetch_offsets[partition],
self.buffer_size))
# Send request
responses = self.client.send_fetch_request(
@@ -384,18 +422,9 @@ class SimpleConsumer(Consumer):
partition = resp.partition
try:
for message in resp.messages:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
# Put the message in our queue
- if self.partition_info:
- self.queue.put((partition, message))
- else:
- self.queue.put(message)
+ self.queue.put((partition, message))
+ self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall, e:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
@@ -585,12 +614,11 @@ class MultiProcessConsumer(Consumer):
break
# Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
+ self.offsets[partition] = message.offset + 1
self.start.clear()
- yield message
-
self.count_since_commit += 1
self._auto_commit()
+ yield message
self.start.clear()
@@ -613,9 +641,10 @@ class MultiProcessConsumer(Consumer):
self.size.value = count
self.pause.clear()
- if timeout:
+ if timeout is not None:
max_time = time.time() + timeout
+ new_offsets = {}
while count > 0 and (timeout is None or timeout > 0):
# Trigger consumption only if the queue is empty
# By doing this, we will ensure that consumers do not
@@ -630,16 +659,18 @@ class MultiProcessConsumer(Consumer):
break
messages.append(message)
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset
- self.count_since_commit += 1
- self._auto_commit()
+ new_offsets[partition] = message.offset + 1
count -= 1
- timeout = max_time - time.time()
+ if timeout is not None:
+ timeout = max_time - time.time()
self.size.value = 0
self.start.clear()
self.pause.set()
+ # Update and commit offsets if necessary
+ self.offsets.update(new_offsets)
+ self.count_since_commit += len(messages)
+ self._auto_commit()
+
return messages
diff --git a/kafka/producer.py b/kafka/producer.py
index 6b624f2..12a2934 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -8,7 +8,7 @@ from collections import defaultdict
from itertools import cycle
from multiprocessing import Queue, Process
-from kafka.common import ProduceRequest
+from kafka.common import ProduceRequest, TopicAndPartition
from kafka.partitioner import HashedPartitioner
from kafka.protocol import create_message
@@ -20,7 +20,7 @@ BATCH_SEND_MSG_COUNT = 20
STOP_ASYNC_PRODUCER = -1
-def _send_upstream(topic, queue, client, batch_time, batch_size,
+def _send_upstream(queue, client, batch_time, batch_size,
req_acks, ack_timeout):
"""
Listen on the queue for a specified number of messages or till
@@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
# timeout is reached
while count > 0 and timeout >= 0:
try:
- partition, msg = queue.get(timeout=timeout)
+ topic_partition, msg = queue.get(timeout=timeout)
+
except Empty:
break
# Check if the controller has requested us to stop
- if partition == STOP_ASYNC_PRODUCER:
+ if topic_partition == STOP_ASYNC_PRODUCER:
stop = True
break
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
- msgset[partition].append(msg)
+ msgset[topic_partition].append(msg)
# Send collected requests upstream
reqs = []
- for partition, messages in msgset.items():
- req = ProduceRequest(topic, partition, messages)
+ for topic_partition, messages in msgset.items():
+ req = ProduceRequest(topic_partition.topic,
+ topic_partition.partition,
+ messages)
reqs.append(req)
try:
@@ -78,7 +81,6 @@ class Producer(object):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +121,7 @@ class Producer(object):
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
- args=(self.topic,
- self.queue,
+ args=(self.queue,
self.client.copy(),
batch_send_every_t,
batch_send_every_n,
@@ -131,17 +132,18 @@ class Producer(object):
self.proc.daemon = True
self.proc.start()
- def send_messages(self, partition, *msg):
+ def send_messages(self, topic, partition, *msg):
"""
Helper method to send produce requests
"""
if self.async:
for m in msg:
- self.queue.put((partition, create_message(m)))
+ self.queue.put((TopicAndPartition(topic, partition),
+ create_message(m)))
resp = []
else:
messages = [create_message(m) for m in msg]
- req = ProduceRequest(self.topic, partition, messages)
+ req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
@@ -169,7 +171,6 @@ class SimpleProducer(Producer):
Params:
client - The Kafka client instance to use
- topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +181,31 @@ class SimpleProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, async=False,
+ def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client.load_metadata_for_topics(topic)
- self.next_partition = cycle(client.topic_partitions[topic])
-
+ self.partition_cycles = {}
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send_messages(self, *msg):
- partition = self.next_partition.next()
- return super(SimpleProducer, self).send_messages(partition, *msg)
+ def _next_partition(self, topic):
+ if topic not in self.partition_cycles:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
+ return self.partition_cycles[topic].next()
+
+ def send_messages(self, topic, *msg):
+ partition = self._next_partition(topic)
+ return super(SimpleProducer, self).send_messages(topic, partition, *msg)
def __repr__(self):
- return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async)
+ return '<SimpleProducer batch=%s>' % self.async
class KeyedProducer(Producer):
@@ -209,7 +214,6 @@ class KeyedProducer(Producer):
Args:
client - The kafka client instance
- topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
@@ -220,29 +224,34 @@ class KeyedProducer(Producer):
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None, async=False,
+ def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
- self.topic = topic
- client.load_metadata_for_topics(topic)
-
if not partitioner:
partitioner = HashedPartitioner
-
- self.partitioner = partitioner(client.topic_partitions[topic])
+ self.partitioner_class = partitioner
+ self.partitioners = {}
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, batch_send,
batch_send_every_n,
batch_send_every_t)
- def send(self, key, msg):
- partitions = self.client.topic_partitions[self.topic]
- partition = self.partitioner.partition(key, partitions)
- return self.send_messages(partition, msg)
+ def _next_partition(self, topic, key):
+ if topic not in self.partitioners:
+ if topic not in self.client.topic_partitions:
+ self.client.load_metadata_for_topics(topic)
+ self.partitioners[topic] = \
+ self.partitioner_class(self.client.topic_partitions[topic])
+ partitioner = self.partitioners[topic]
+ return partitioner.partition(key, self.client.topic_partitions[topic])
+
+ def send(self, topic, key, msg):
+ partition = self._next_partition(topic, key)
+ return self.send_messages(topic, partition, msg)
def __repr__(self):
- return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async)
+ return '<KeyedProducer batch=%s>' % self.async
diff --git a/test/test_integration.py b/test/test_integration.py
index 5a22630..d0da523 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -284,8 +284,8 @@ class TestKafkaClient(KafkaTestCase):
# Producer Tests
def test_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic)
- resp = producer.send_messages("one", "two")
+ producer = SimpleProducer(self.client)
+ resp = producer.send_messages(self.topic, "one", "two")
# Will go to partition 0
self.assertEquals(len(resp), 1)
@@ -293,7 +293,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(resp[0].offset, 0) # offset of first msg
# Will go to partition 1
- resp = producer.send_messages("three")
+ resp = producer.send_messages(self.topic, "three")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 0) # offset of first msg
@@ -315,7 +315,7 @@ class TestKafkaClient(KafkaTestCase):
self.assertEquals(messages[0].message.value, "three")
# Will go to partition 0
- resp = producer.send_messages("four", "five")
+ resp = producer.send_messages(self.topic, "four", "five")
self.assertEquals(len(resp), 1)
self.assertEquals(resp[0].error, 0)
self.assertEquals(resp[0].offset, 2) # offset of first msg
@@ -323,12 +323,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_round_robin_partitioner(self):
- producer = KeyedProducer(self.client, self.topic,
+ producer = KeyedProducer(self.client,
partitioner=RoundRobinPartitioner)
- producer.send("key1", "one")
- producer.send("key2", "two")
- producer.send("key3", "three")
- producer.send("key4", "four")
+ producer.send(self.topic, "key1", "one")
+ producer.send(self.topic, "key2", "two")
+ producer.send(self.topic, "key3", "three")
+ producer.send(self.topic, "key4", "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -357,12 +357,12 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_hashed_partitioner(self):
- producer = KeyedProducer(self.client, self.topic,
+ producer = KeyedProducer(self.client,
partitioner=HashedPartitioner)
- producer.send(1, "one")
- producer.send(2, "two")
- producer.send(3, "three")
- producer.send(4, "four")
+ producer.send(self.topic, 1, "one")
+ producer.send(self.topic, 2, "two")
+ producer.send(self.topic, 3, "three")
+ producer.send(self.topic, 4, "four")
fetch1 = FetchRequest(self.topic, 0, 0, 1024)
fetch2 = FetchRequest(self.topic, 1, 0, 1024)
@@ -391,9 +391,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_acks_none(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_NOT_REQUIRED)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -410,9 +410,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_acks_local_write(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -430,9 +430,9 @@ class TestKafkaClient(KafkaTestCase):
def test_acks_cluster_commit(self):
producer = SimpleProducer(
- self.client, self.topic,
+ self.client,
req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
- resp = producer.send_messages("one")
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 1)
fetch = FetchRequest(self.topic, 0, 0, 1024)
@@ -449,8 +449,8 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_async_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic, async=True)
- resp = producer.send_messages("one")
+ producer = SimpleProducer(self.client, async=True)
+ resp = producer.send_messages(self.topic, "one")
self.assertEquals(len(resp), 0)
# Give it some time
@@ -470,9 +470,9 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_async_keyed_producer(self):
- producer = KeyedProducer(self.client, self.topic, async=True)
+ producer = KeyedProducer(self.client, async=True)
- resp = producer.send("key1", "one")
+ resp = producer.send(self.topic, "key1", "one")
self.assertEquals(len(resp), 0)
# Give it some time
@@ -492,14 +492,14 @@ class TestKafkaClient(KafkaTestCase):
producer.stop()
def test_batched_simple_producer(self):
- producer = SimpleProducer(self.client, self.topic,
+ producer = SimpleProducer(self.client,
batch_send=True,
batch_send_every_n=10,
batch_send_every_t=20)
# Send 5 messages and do a fetch
msgs = ["message-%d" % i for i in range(0, 5)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Batch mode is async. No ack
self.assertEquals(len(resp), 0)
@@ -522,7 +522,7 @@ class TestKafkaClient(KafkaTestCase):
# Send 5 more messages, wait for 2 seconds and do a fetch
msgs = ["message-%d" % i for i in range(5, 10)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
# Give it some time
time.sleep(2)
@@ -542,9 +542,9 @@ class TestKafkaClient(KafkaTestCase):
# Send 7 messages and wait for 20 seconds
msgs = ["message-%d" % i for i in range(10, 15)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
msgs = ["message-%d" % i for i in range(15, 17)]
- resp = producer.send_messages(*msgs)
+ resp = producer.send_messages(self.topic, *msgs)
fetch1 = FetchRequest(self.topic, 0, 5, 1024)
fetch2 = FetchRequest(self.topic, 1, 5, 1024)
@@ -846,25 +846,25 @@ class TestFailover(KafkaTestCase):
def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client, topic)
+ producer = SimpleProducer(self.client)
for i in range(1, 4):
# XXX unfortunately, the conns dict needs to be warmed for this to work
# XXX unfortunately, for warming to work, we need at least as many partitions as brokers
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
with self.assertRaises(FailedPayloadsError):
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -877,22 +877,22 @@ class TestFailover(KafkaTestCase):
def test_switch_leader_async(self):
key, topic, partition = random_string(5), self.topic, 0
- producer = SimpleProducer(self.client, topic, async=True)
+ producer = SimpleProducer(self.client, async=True)
for i in range(1, 4):
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
# kil leader for partition 0
broker = self._kill_leader(topic, partition)
# expect failure, reload meta data
- producer.send_messages('part 1')
- producer.send_messages('part 2')
+ producer.send_messages(self.topic, 'part 1')
+ producer.send_messages(self.topic, 'part 2')
time.sleep(1)
# send to new leader
- self._send_random_messages(producer, 10)
+ self._send_random_messages(producer, self.topic, 10)
broker.open()
time.sleep(3)
@@ -903,9 +903,9 @@ class TestFailover(KafkaTestCase):
producer.stop()
- def _send_random_messages(self, producer, n):
+ def _send_random_messages(self, producer, topic, n):
for j in range(n):
- resp = producer.send_messages(random_string(10))
+ resp = producer.send_messages(topic, random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
time.sleep(1) # give it some time