summaryrefslogtreecommitdiff
path: root/test/test_partitioner.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
commit077dc4742ffa82584946379790424faf4c6ba47f (patch)
treebd14706a8dfc429f6bf211bac02ad21af967c6ce /test/test_partitioner.py
parent48e96822b3ec4f897438a2d1cdb735f51648cb48 (diff)
parent85c0dd2579eb6aa0b9492d9082d0f4cf4d8ea39d (diff)
downloadkafka-python-077dc4742ffa82584946379790424faf4c6ba47f.tar.gz
Merge pull request #515 from dpkp/kafka_producer
KafkaProducer
Diffstat (limited to 'test/test_partitioner.py')
-rw-r--r--test/test_partitioner.py64
1 files changed, 42 insertions, 22 deletions
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index 67cd83b..52b6b81 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -1,23 +1,43 @@
+import pytest
import six
-from . import unittest
-
-from kafka.partitioner import (Murmur2Partitioner)
-
-class TestMurmurPartitioner(unittest.TestCase):
- def test_hash_bytes(self):
- p = Murmur2Partitioner(range(1000))
- self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
-
- def test_hash_encoding(self):
- p = Murmur2Partitioner(range(1000))
- self.assertEqual(p.partition('test'), p.partition(u'test'))
-
- def test_murmur2_java_compatibility(self):
- p = Murmur2Partitioner(range(1000))
- # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
- self.assertEqual(681, p.partition(b''))
- self.assertEqual(524, p.partition(b'a'))
- self.assertEqual(434, p.partition(b'ab'))
- self.assertEqual(107, p.partition(b'abc'))
- self.assertEqual(566, p.partition(b'123456789'))
- self.assertEqual(742, p.partition(b'\x00 '))
+
+from kafka.partitioner import Murmur2Partitioner
+from kafka.partitioner.default import DefaultPartitioner
+
+
+def test_default_partitioner():
+ partitioner = DefaultPartitioner()
+ all_partitions = list(range(100))
+ available = all_partitions
+ # partitioner should return the same partition for the same key
+ p1 = partitioner(b'foo', all_partitions, available)
+ p2 = partitioner(b'foo', all_partitions, available)
+ assert p1 == p2
+ assert p1 in all_partitions
+
+ # when key is None, choose one of available partitions
+ assert partitioner(None, all_partitions, [123]) == 123
+
+ # with fallback to all_partitions
+ assert partitioner(None, all_partitions, []) in all_partitions
+
+
+def test_hash_bytes():
+ p = Murmur2Partitioner(range(1000))
+ assert p.partition(bytearray(b'test')) == p.partition(b'test')
+
+
+def test_hash_encoding():
+ p = Murmur2Partitioner(range(1000))
+ assert p.partition('test') == p.partition(u'test')
+
+
+def test_murmur2_java_compatibility():
+ p = Murmur2Partitioner(range(1000))
+ # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
+ assert p.partition(b'') == 681
+ assert p.partition(b'a') == 524
+ assert p.partition(b'ab') == 434
+ assert p.partition(b'abc') == 107
+ assert p.partition(b'123456789') == 566
+ assert p.partition(b'\x00 ') == 742