summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md34
-rw-r--r--kafka/__init__.py6
-rw-r--r--kafka/client.py13
-rw-r--r--kafka/conn.py2
-rw-r--r--kafka/producer.py207
-rw-r--r--test/test_integration.py277
6 files changed, 509 insertions, 30 deletions
diff --git a/README.md b/README.md
index 60cc745..829fab2 100644
--- a/README.md
+++ b/README.md
@@ -26,14 +26,46 @@ development, APIs are subject to change.
```python
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
-from kafka.producer import SimpleProducer
+from kafka.producer import SimpleProducer, KeyedProducer
kafka = KafkaClient("localhost", 9092)
+# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
+# To send messages asynchronously
+producer = SimpleProducer(kafka, "my-topic", async=True)
+producer.send_messages("async message")
+
+# To wait for acknowledgements
+# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
+# a local log before sending response
+# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
+# by all in sync replicas before sending a response
+producer = SimpleProducer(kafka, "my-topic", async=False,
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
+ acks_timeout=2000)
+
+response = producer.send_messages("async message")
+
+if response:
+ print(response[0].error)
+ print(response[0].offset)
+
+# To send messages in batch. You can use any of the available
+# producers for doing this. The following producer will collect
+# messages in batch and send them to Kafka after 20 messages are
+# collected or every 60 seconds
+# Notes:
+# * If the producer dies before the messages are sent, there will be losses
+# * Call producer.stop() to send the messages and cleanup
+producer = SimpleProducer(kafka, "my-topic", batch_send=True,
+ batch_send_every_n=20,
+ batch_send_every_t=60)
+
+# To consume messages
consumer = SimpleConsumer(kafka, "my-group", "my-topic")
for message in consumer:
print(message)
diff --git a/kafka/__init__.py b/kafka/__init__.py
index d229169..e352c03 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -9,10 +9,12 @@ from kafka.conn import KafkaConnection
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
-from kafka.producer import SimpleProducer
+from kafka.producer import SimpleProducer, KeyedProducer
+from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer
__all__ = [
- 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'SimpleConsumer',
+ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
+ 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..b3f8667 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -161,12 +161,16 @@ class KafkaClient(object):
# Send the request, recv the response
conn.send(requestId, request)
+
+ if decoder_fn is None:
+ continue
+
response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response
# Order the accumulated responses by the original key order
- return (acc[k] for k in original_keys)
+ return (acc[k] for k in original_keys) if acc else ()
#################
# Public API #
@@ -201,7 +205,12 @@ class KafkaClient(object):
encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
- decoder = KafkaProtocol.decode_produce_response
+
+ if acks == 0:
+ decoder = None
+ else:
+ decoder = KafkaProtocol.decode_produce_response
+
resps = self._send_broker_aware_request(payloads, encoder, decoder)
out = []
diff --git a/kafka/conn.py b/kafka/conn.py
index fce1fdc..aba3ada 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -76,11 +76,11 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent != None:
raise RuntimeError("Kafka went away")
- self.data = self._consume_response()
def recv(self, requestId):
"Get a response from Kafka"
log.debug("Reading response %d from Kafka" % requestId)
+ self.data = self._consume_response()
return self.data
def close(self):
diff --git a/kafka/producer.py b/kafka/producer.py
index 69c3830..06e468d 100644
--- a/kafka/producer.py
+++ b/kafka/producer.py
@@ -1,5 +1,10 @@
+from collections import defaultdict
+from datetime import datetime, timedelta
from itertools import cycle
+from multiprocessing import Queue, Process
+from Queue import Empty
import logging
+import sys
from kafka.common import ProduceRequest
from kafka.protocol import create_message
@@ -7,26 +12,175 @@ from kafka.partitioner import HashedPartitioner
log = logging.getLogger("kafka")
+BATCH_SEND_DEFAULT_INTERVAL = 20
+BATCH_SEND_MSG_COUNT = 20
-class SimpleProducer(object):
+STOP_ASYNC_PRODUCER = -1
+
+
+class Producer(object):
"""
- A simple, round-robbin producer. Each message goes to exactly one partition
+ Base class to be used by producers
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If set to true, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic):
+
+ ACK_NOT_REQUIRED = 0 # No ack is required
+ ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
+ ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
+
+ DEFAULT_ACK_TIMEOUT = 1000
+
+ def __init__(self, client, async=False,
+ req_acks=ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
+
+ if batch_send:
+ async = True
+ assert batch_send_every_n > 0
+ assert batch_send_every_t > 0
+ else:
+ batch_send_every_n = 1
+ batch_send_every_t = 3600
+
self.client = client
+ self.async = async
+ self.req_acks = req_acks
+ self.ack_timeout = ack_timeout
+ self.batch_send = batch_send
+ self.batch_size = batch_send_every_n
+ self.batch_time = batch_send_every_t
+
+ if self.async:
+ self.queue = Queue() # Messages are sent through this queue
+ self.proc = Process(target=self._send_upstream, args=(self.queue,))
+ self.proc.daemon = True # Process will die if main thread exits
+ self.proc.start()
+
+ def _send_upstream(self, queue):
+ """
+ Listen on the queue for a specified number of messages or till
+ a specified timeout and send them upstream to the brokers in one
+ request
+ """
+ stop = False
+
+ while not stop:
+ timeout = self.batch_time
+ send_at = datetime.now() + timedelta(seconds=timeout)
+ count = self.batch_size
+ msgset = defaultdict(list)
+
+ # Keep fetching till we gather enough messages or a
+ # timeout is reached
+ while count > 0 and timeout >= 0:
+ try:
+ partition, msg = queue.get(timeout=timeout)
+ except Empty:
+ break
+
+ # Check if the controller has requested us to stop
+ if partition == STOP_ASYNC_PRODUCER:
+ stop = True
+ break
+
+ # Adjust the timeout to match the remaining period
+ count -= 1
+ timeout = (send_at - datetime.now()).total_seconds()
+ msgset[partition].append(msg)
+
+ # Send collected requests upstream
+ reqs = []
+ for partition, messages in msgset.items():
+ req = ProduceRequest(self.topic, partition, messages)
+ reqs.append(req)
+
+ try:
+ self.client.send_produce_request(reqs, acks=self.req_acks,
+ timeout=self.ack_timeout)
+ except Exception as exp:
+ log.error("Error sending message", exc_info=sys.exc_info())
+
+ def send_messages(self, partition, *msg):
+ """
+ Helper method to send produce requests
+ """
+ if self.async:
+ for m in msg:
+ self.queue.put((partition, create_message(m)))
+ resp = []
+ else:
+ messages = [create_message(m) for m in msg]
+ req = ProduceRequest(self.topic, partition, messages)
+ resp = self.client.send_produce_request([req], acks=self.req_acks,
+ timeout=self.ack_timeout)
+ return resp
+
+ def stop(self, timeout=1):
+ """
+ Stop the producer. Optionally wait for the specified timeout before
+ forcefully cleaning up.
+ """
+ if self.async:
+ self.queue.put((STOP_ASYNC_PRODUCER, None))
+ self.proc.join(timeout)
+
+ if self.proc.is_alive():
+ self.proc.terminate()
+
+
+class SimpleProducer(Producer):
+ """
+ A simple, round-robbin producer. Each message goes to exactly one partition
+
+ Params:
+ client - The Kafka client instance to use
+ topic - The topic for sending messages to
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ req_acks - A value indicating the acknowledgements that the server must
+ receive before responding to the request
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
+ """
+ def __init__(self, client, topic, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
- self.next_partition = cycle(self.client.topic_partitions[topic])
+ client._load_metadata_for_topics(topic)
+ self.next_partition = cycle(client.topic_partitions[topic])
- def send_messages(self, *msg):
- req = ProduceRequest(self.topic, self.next_partition.next(),
- messages=[create_message(m) for m in msg])
+ super(SimpleProducer, self).__init__(client, async, req_acks,
+ ack_timeout, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ def send_messages(self, *msg):
+ partition = self.next_partition.next()
+ return super(SimpleProducer, self).send_messages(partition, *msg)
-class KeyedProducer(object):
+class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
@@ -35,23 +189,34 @@ class KeyedProducer(object):
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
+ async - If True, the messages are sent asynchronously via another
+ thread (process). We will not wait for a response to these
+ ack_timeout - Value (in milliseconds) indicating a timeout for waiting
+ for an acknowledgement
+ batch_send - If True, messages are send in batches
+ batch_send_every_n - If set, messages are send in batches of this size
+ batch_send_every_t - If set, messages are send after this timeout
"""
- def __init__(self, client, topic, partitioner=None):
- self.client = client
+ def __init__(self, client, topic, partitioner=None, async=False,
+ req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
+ ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
+ batch_send=False,
+ batch_send_every_n=BATCH_SEND_MSG_COUNT,
+ batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
self.topic = topic
- self.client._load_metadata_for_topics(topic)
+ client._load_metadata_for_topics(topic)
if not partitioner:
partitioner = HashedPartitioner
- self.partitioner = partitioner(self.client.topic_partitions[topic])
+ self.partitioner = partitioner(client.topic_partitions[topic])
+
+ super(KeyedProducer, self).__init__(client, async, req_acks,
+ ack_timeout, batch_send,
+ batch_send_every_n,
+ batch_send_every_t)
def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
-
- req = ProduceRequest(self.topic, partition,
- messages=[create_message(msg)])
-
- resp = self.client.send_produce_request([req])[0]
- assert resp.error == 0
+ return self.send_messages(partition, msg)
diff --git a/test/test_integration.py b/test/test_integration.py
index d607b73..6d96b6a 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -1,5 +1,6 @@
import logging
import unittest
+import time
from kafka import * # noqa
from kafka.common import * # noqa
@@ -254,12 +255,23 @@ class TestKafkaClient(unittest.TestCase):
def test_simple_producer(self):
producer = SimpleProducer(self.client, "test_simple_producer")
- producer.send_messages("one", "two")
- producer.send_messages("three")
+ resp = producer.send_messages("one", "two")
+
+ # Will go to partition 0
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 0) # offset of first msg
+
+ # Will go to partition 1
+ resp = producer.send_messages("three")
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 0) # offset of first msg
fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024)
fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024)
- fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2])
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
self.assertEquals(fetch_resp1.error, 0)
self.assertEquals(fetch_resp1.highwaterMark, 2)
messages = list(fetch_resp1.messages)
@@ -272,6 +284,265 @@ class TestKafkaClient(unittest.TestCase):
self.assertEquals(len(messages), 1)
self.assertEquals(messages[0].message.value, "three")
+ # Will go to partition 0
+ resp = producer.send_messages("four", "five")
+ self.assertEquals(len(resp), 1)
+ self.assertEquals(resp[0].error, 0)
+ self.assertEquals(resp[0].offset, 2) # offset of first msg
+
+ producer.stop()
+
+ def test_round_robin_partitioner(self):
+ producer = KeyedProducer(self.client, "test_round_robin_partitioner",
+ partitioner=RoundRobinPartitioner)
+ producer.send("key1", "one")
+ producer.send("key2", "two")
+ producer.send("key3", "three")
+ producer.send("key4", "four")
+
+ fetch1 = FetchRequest("test_round_robin_partitioner", 0, 0, 1024)
+ fetch2 = FetchRequest("test_round_robin_partitioner", 1, 0, 1024)
+
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 2)
+ self.assertEquals(fetch_resp1.partition, 0)
+
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "one")
+ self.assertEquals(messages[1].message.value, "three")
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 2)
+ self.assertEquals(fetch_resp2.partition, 1)
+
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "two")
+ self.assertEquals(messages[1].message.value, "four")
+
+ producer.stop()
+
+ def test_hashed_partitioner(self):
+ producer = KeyedProducer(self.client, "test_hash_partitioner",
+ partitioner=HashedPartitioner)
+ producer.send(1, "one")
+ producer.send(2, "two")
+ producer.send(3, "three")
+ producer.send(4, "four")
+
+ fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024)
+ fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024)
+
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp1.highwaterMark, 2)
+ self.assertEquals(fetch_resp1.partition, 0)
+
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "two")
+ self.assertEquals(messages[1].message.value, "four")
+
+ self.assertEquals(fetch_resp2.error, 0)
+ self.assertEquals(fetch_resp2.highwaterMark, 2)
+ self.assertEquals(fetch_resp2.partition, 1)
+
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 2)
+ self.assertEquals(messages[0].message.value, "one")
+ self.assertEquals(messages[1].message.value, "three")
+
+ producer.stop()
+
+ def test_acks_none(self):
+ producer = SimpleProducer(self.client, "test_acks_none",
+ req_acks=SimpleProducer.ACK_NOT_REQUIRED)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 0)
+
+ fetch = FetchRequest("test_acks_none", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_acks_local_write(self):
+ producer = SimpleProducer(self.client, "test_acks_local_write",
+ req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 1)
+
+ fetch = FetchRequest("test_acks_local_write", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_acks_cluster_commit(self):
+ producer = SimpleProducer(self.client, "test_acks_cluster_commit",
+ req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT)
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 1)
+
+ fetch = FetchRequest("test_acks_cluster_commit", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_async_simple_producer(self):
+ producer = SimpleProducer(self.client, "test_async_simple_producer",
+ async=True)
+
+ resp = producer.send_messages("one")
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch = FetchRequest("test_async_simple_producer", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_async_keyed_producer(self):
+ producer = KeyedProducer(self.client, "test_async_keyed_producer",
+ async=True)
+
+ resp = producer.send("key1", "one")
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch = FetchRequest("test_async_keyed_producer", 0, 0, 1024)
+ fetch_resp = self.client.send_fetch_request([fetch])
+
+ self.assertEquals(fetch_resp[0].error, 0)
+ self.assertEquals(fetch_resp[0].highwaterMark, 1)
+ self.assertEquals(fetch_resp[0].partition, 0)
+
+ messages = list(fetch_resp[0].messages)
+ self.assertEquals(len(messages), 1)
+ self.assertEquals(messages[0].message.value, "one")
+
+ producer.stop()
+
+ def test_batched_simple_producer(self):
+ producer = SimpleProducer(self.client, "test_batched_simple_producer",
+ batch_send=True,
+ batch_send_every_n=10,
+ batch_send_every_t=20)
+
+ # Send 5 messages and do a fetch
+ msgs = ["message-%d" % i for i in range(0, 5)]
+ resp = producer.send_messages(*msgs)
+
+ # Batch mode is async. No ack
+ self.assertEquals(len(resp), 0)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 0)
+
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 0)
+
+ # Send 5 more messages, wait for 2 seconds and do a fetch
+ msgs = ["message-%d" % i for i in range(5, 10)]
+ resp = producer.send_messages(*msgs)
+
+ # Give it some time
+ time.sleep(2)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 0, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 0, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ messages = list(fetch_resp1.messages)
+ self.assertEquals(len(messages), 5)
+
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 5)
+
+ # Send 7 messages and wait for 20 seconds
+ msgs = ["message-%d" % i for i in range(10, 15)]
+ resp = producer.send_messages(*msgs)
+ msgs = ["message-%d" % i for i in range(15, 17)]
+ resp = producer.send_messages(*msgs)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp1.messages) + list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 0)
+
+ # Give it some time
+ time.sleep(22)
+
+ fetch1 = FetchRequest("test_batched_simple_producer", 0, 5, 1024)
+ fetch2 = FetchRequest("test_batched_simple_producer", 1, 5, 1024)
+ fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1,
+ fetch2])
+
+ self.assertEquals(fetch_resp1.error, 0)
+ self.assertEquals(fetch_resp2.error, 0)
+ messages = list(fetch_resp1.messages) + list(fetch_resp2.messages)
+ self.assertEquals(len(messages), 7)
+
+ producer.stop()
+
class TestSimpleConsumer(unittest.TestCase):
@classmethod