summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_consumer_group.py13
1 files changed, 13 insertions, 0 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 03656fa..6ef2020 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -76,10 +76,23 @@ def test_group(kafka_broker, topic):
timeout = time.time() + 35
while True:
for c in range(num_consumers):
+
+ # Verify all consumers have been created
if c not in consumers:
break
+
+ # Verify all consumers have an assignment
elif not consumers[c].assignment():
break
+
+ # Verify all consumers are in the same generation
+ generations = set()
+ for consumer in six.itervalues(consumers):
+ generations.add(consumer._coordinator.generation)
+ if len(generations) != 1:
+ break
+
+ # If all checks passed, log state and break while loop
else:
for c in range(num_consumers):
logging.info("[%s] %s %s: %s", c,