diff options
-rw-r--r-- | test/test_consumer_group.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 3d10f8f..34b1be4 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -52,6 +52,7 @@ def test_group(kafka_broker, topic): connect_str = 'localhost:' + str(kafka_broker.port) consumers = {} stop = {} + threads = {} messages = collections.defaultdict(list) def consumer_thread(i): assert i not in consumers @@ -61,7 +62,7 @@ def test_group(kafka_broker, topic): bootstrap_servers=connect_str, heartbeat_interval_ms=500) while not stop[i].is_set(): - for tp, records in six.itervalues(consumers[i].poll()): + for tp, records in six.itervalues(consumers[i].poll(100)): messages[i][tp].extend(records) consumers[i].close() del consumers[i] @@ -70,8 +71,8 @@ def test_group(kafka_broker, topic): num_consumers = 4 for i in range(num_consumers): t = threading.Thread(target=consumer_thread, args=(i,)) - t.daemon = True t.start() + threads[i] = t try: timeout = time.time() + 35 @@ -116,6 +117,7 @@ def test_group(kafka_broker, topic): finally: for c in range(num_consumers): stop[c].set() + threads[c].join() @pytest.fixture |