summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-13 23:19:53 -0700
committerDana Powers <dana.powers@gmail.com>2016-03-13 23:19:53 -0700
commit0c272a05b3b29515759620803053d091ef98260d (patch)
tree0bc1548580f50008b23918f2312b131fafff0e0b /test
parenta45cd4d17bd7f6d1fe9ae887f5847182a799ca07 (diff)
parent4ffd4e94e05e9494bd5ec32bd1037f65ed820986 (diff)
downloadkafka-python-0c272a05b3b29515759620803053d091ef98260d.tar.gz
Merge pull request #585 from dpkp/truncate_buffer
Truncate deallocated message buffers
Diffstat (limited to 'test')
-rw-r--r--test/test_producer.py27
1 files changed, 22 insertions, 5 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 829c6f2..f11bb05 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -3,10 +3,23 @@ import sys
import pytest
from kafka import KafkaConsumer, KafkaProducer
+from kafka.producer.buffer import SimpleBufferPool
from test.conftest import version
from test.testutil import random_string
+def test_buffer_pool():
+ pool = SimpleBufferPool(1000, 1000)
+
+ buf1 = pool.allocate(1000, 1000)
+ message = ''.join(map(str, range(100)))
+ buf1.write(message.encode('utf-8'))
+ pool.deallocate(buf1)
+
+ buf2 = pool.allocate(1000, 1000)
+ assert buf2.read() == b''
+
+
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
def test_end_to_end(kafka_broker, compression):
@@ -33,17 +46,21 @@ def test_end_to_end(kafka_broker, compression):
topic = random_string(5)
- for i in range(1000):
- producer.send(topic, 'msg %d' % i)
- producer.flush(timeout=30)
+ messages = 100
+ futures = []
+ for i in range(messages):
+ futures.append(producer.send(topic, 'msg %d' % i))
+ ret = [f.get(timeout=30) for f in futures]
+ assert len(ret) == messages
+
producer.close()
consumer.subscribe([topic])
msgs = set()
- for i in range(1000):
+ for i in range(messages):
try:
msgs.add(next(consumer).value)
except StopIteration:
break
- assert msgs == set(['msg %d' % i for i in range(1000)])
+ assert msgs == set(['msg %d' % i for i in range(messages)])