summaryrefslogtreecommitdiff
path: root/test/testutil.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/testutil.py')
-rw-r--r--test/testutil.py11
1 files changed, 11 insertions, 0 deletions
diff --git a/test/testutil.py b/test/testutil.py
index 3272262..650f9bf 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -47,6 +47,17 @@ def current_offset(client, topic, partition, kafka_broker=None):
return offsets.offsets[0]
+def assert_message_count(messages, num_messages):
+ """Check that we received the expected number of messages with no duplicates."""
+ # Make sure we got them all
+ assert len(messages) == num_messages
+ # Make sure there are no duplicates
+ # Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
+ # timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
+ unique_messages = {(m.key, m.value) for m in messages}
+ assert len(unique_messages) == num_messages
+
+
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None