diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-11 21:48:31 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-11 21:48:31 -0700 |
commit | 59400bdad759fddc1d58bdae64de953968316b61 (patch) | |
tree | 0a7df1ada8e4e502c90215d9b54bf16684b3b8e1 /test | |
parent | 8c0792581d8a38822c01b40f5d3926c659b0c439 (diff) | |
download | kafka-python-conn_lock_async_send.tar.gz |
Synchronize puts to KafkaConsumer protocol buffer during async sendsconn_lock_async_send
Diffstat (limited to 'test')
-rw-r--r-- | test/test_conn.py | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/test/test_conn.py b/test/test_conn.py index 27d77be..953c112 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -112,8 +112,8 @@ def test_send_connecting(conn): def test_send_max_ifr(conn): conn.state = ConnectionStates.CONNECTED max_ifrs = conn.config['max_in_flight_requests_per_connection'] - for _ in range(max_ifrs): - conn.in_flight_requests.append('foo') + for i in range(max_ifrs): + conn.in_flight_requests[i] = 'foo' f = conn.send('foobar') assert f.failed() is True assert isinstance(f.exception, Errors.TooManyInFlightRequests) @@ -170,9 +170,9 @@ def test_send_error(_socket, conn): def test_can_send_more(conn): assert conn.can_send_more() is True max_ifrs = conn.config['max_in_flight_requests_per_connection'] - for _ in range(max_ifrs): + for i in range(max_ifrs): assert conn.can_send_more() is True - conn.in_flight_requests.append('foo') + conn.in_flight_requests[i] = 'foo' assert conn.can_send_more() is False @@ -311,3 +311,23 @@ def test_relookup_on_failure(): assert conn._sock_afi == afi2 assert conn._sock_addr == sockaddr2 conn.close() + + +def test_requests_timed_out(conn): + with mock.patch("time.time", return_value=0): + # No in-flight requests, not timed out + assert not conn.requests_timed_out() + + # Single request, timestamp = now (0) + conn.in_flight_requests[0] = ('foo', 0) + assert not conn.requests_timed_out() + + # Add another request w/ timestamp > request_timeout ago + request_timeout = conn.config['request_timeout_ms'] + expired_timestamp = 0 - request_timeout - 1 + conn.in_flight_requests[1] = ('bar', expired_timestamp) + assert conn.requests_timed_out() + + # Drop the expired request and we should be good to go again + conn.in_flight_requests.pop(1) + assert not conn.requests_timed_out() |