diff options
-rw-r--r-- | test/test_consumer_group.py | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 03656fa..6ef2020 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -76,10 +76,23 @@ def test_group(kafka_broker, topic): timeout = time.time() + 35 while True: for c in range(num_consumers): + + # Verify all consumers have been created if c not in consumers: break + + # Verify all consumers have an assignment elif not consumers[c].assignment(): break + + # Verify all consumers are in the same generation + generations = set() + for consumer in six.itervalues(consumers): + generations.add(consumer._coordinator.generation) + if len(generations) != 1: + break + + # If all checks passed, log state and break while loop else: for c in range(num_consumers): logging.info("[%s] %s %s: %s", c, |