diff options
-rw-r--r-- | kafka/client_async.py | 12 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 3 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 | ||||
-rw-r--r-- | kafka/producer/sender.py | 2 | ||||
-rw-r--r-- | test/test_client_async.py | 33 |
5 files changed, 25 insertions, 31 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ecd2cea..4e4e835 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -495,7 +495,7 @@ class KafkaClient(object): return self._conns[node_id].send(request) - def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True): + def poll(self, timeout_ms=None, future=None, delayed_tasks=True): """Try to read and write to sockets. This method will also attempt to complete node connections, refresh @@ -507,9 +507,6 @@ class KafkaClient(object): timeout will be the minimum of timeout, request timeout and metadata timeout. Default: request_timeout_ms future (Future, optional): if provided, blocks until future.is_done - sleep (bool): if True and there is nothing to do (no connections - or requests in flight), will sleep for duration timeout before - returning empty results. Default: False. Returns: list: responses received (can be empty) @@ -553,7 +550,7 @@ class KafkaClient(object): self.config['request_timeout_ms']) timeout = max(0, timeout / 1000.0) # avoid negative timeouts - responses.extend(self._poll(timeout, sleep=sleep)) + responses.extend(self._poll(timeout)) # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done @@ -562,10 +559,7 @@ class KafkaClient(object): return responses - def _poll(self, timeout, sleep=True): - # select on reads across all connected sockets, blocking up to timeout - assert self.in_flight_request_count() > 0 or self._connecting or sleep - + def _poll(self, timeout): responses = [] processed = set() diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c0d6075..10ed187 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -275,8 +275,7 @@ class Fetcher(six.Iterator): if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() - self._client.poll( - future=refresh_future, sleep=True, timeout_ms=remaining_ms) + self._client.poll(future=refresh_future, timeout_ms=remaining_ms) else: time.sleep(self.config['retry_backoff_ms'] / 1000.0) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 54a3711..2de254d 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -613,7 +613,7 @@ class KafkaConsumer(six.Iterator): # Send any new fetches (won't resend pending fetches) self._fetcher.send_fetches() - self._client.poll(timeout_ms=timeout_ms, sleep=True) + self._client.poll(timeout_ms=timeout_ms) records, _ = self._fetcher.fetched_records(max_records) return records @@ -1019,7 +1019,7 @@ class KafkaConsumer(six.Iterator): poll_ms = 1000 * (self._consumer_timeout - time.time()) if not self._fetcher.in_flight_fetches(): poll_ms = 0 - self._client.poll(timeout_ms=poll_ms, sleep=True) + self._client.poll(timeout_ms=poll_ms) # We need to make sure we at least keep up with scheduled tasks, # like heartbeats, auto-commits, and metadata refreshes @@ -1045,6 +1045,8 @@ class KafkaConsumer(six.Iterator): if time.time() > timeout_at: log.debug("internal iterator timeout - breaking for poll") break + if self._client.in_flight_request_count(): + self._client.poll(timeout_ms=0) # An else block on a for loop only executes if there was no break # so this should only be called on a StopIteration from the fetcher diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 2974faf..ad59050 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -156,7 +156,7 @@ class Sender(threading.Thread): # difference between now and its linger expiry time; otherwise the # select time will be the time difference between now and the # metadata expiry time - self._client.poll(poll_timeout_ms, sleep=True) + self._client.poll(poll_timeout_ms) def initiate_close(self): """Start closing the sender (won't complete until all data is sent).""" diff --git a/test/test_client_async.py b/test/test_client_async.py index d4e6d37..ec45543 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -259,23 +259,22 @@ def test_poll(mocker): metadata.return_value = 1000 tasks.return_value = 2 cli.poll() - _poll.assert_called_with(1.0, sleep=True) + _poll.assert_called_with(1.0) # user timeout wins cli.poll(250) - _poll.assert_called_with(0.25, sleep=True) + _poll.assert_called_with(0.25) # tasks timeout wins tasks.return_value = 0 cli.poll(250) - _poll.assert_called_with(0, sleep=True) + _poll.assert_called_with(0) # default is request_timeout_ms metadata.return_value = 1000000 tasks.return_value = 10000 cli.poll() - _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0, - sleep=True) + _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) def test__poll(): @@ -337,8 +336,8 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(1.234, sleep=True) + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(1.234) def test_maybe_refresh_metadata_backoff(mocker, client): @@ -346,15 +345,15 @@ def test_maybe_refresh_metadata_backoff(mocker, client): t = mocker.patch('time.time') t.return_value = now - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(2.222) # reconnect backoff def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(9999.999) # request_timeout_ms def test_maybe_refresh_metadata_update(mocker, client): @@ -362,8 +361,8 @@ def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, '_can_send_request', return_value=True) send = mocker.patch.object(client, 'send') - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(9999.999) # request_timeout_ms assert client._metadata_refresh_in_progress request = MetadataRequest[0]([]) send.assert_called_once_with('foobar', request) @@ -379,16 +378,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): t.return_value = now # first poll attempts connection - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(2.222) # reconnect backoff client._can_connect.assert_called_once_with('foobar') client._maybe_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') client._can_connect.reset_mock() - client.poll(timeout_ms=12345678, sleep=True) - client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout) + client.poll(timeout_ms=12345678) + client._poll.assert_called_with(9999.999) # connection timeout (request timeout) assert not client._can_connect.called assert not client._metadata_refresh_in_progress |