| Commit message (Collapse) | Author | Age | Files | Lines |
|
|
|
|
|
|
| |
This has been deprecated for a bit in favor of `KafkaConnectionError`
because it conflicts with Python's built-in `ConnectionError`.
Time to remove it as part of cleaning up our old deprecated code.
|
|
|
|
|
|
|
|
|
| |
The docs for `api_version_auto_timeout_ms` mention setting
`api_version='auto'` but that value has been deprecated for years in
favor of `api_version=None`.
Updating the docs for now, and will remove support for `'auto'` in next
major version bump.
|
| |
|
|
|
| |
If the cluster metadata object has no info about the topic, then issue a blocking metadata call to fetch it.
|
| |
|
|
|
|
|
| |
Use `futures` to parallelize calls to `_send_request_to_node()`
This allows queries that need to go to multiple brokers to be run in parallel.
|
|
|
| |
Now that the old zookeeper consumer has been completely deprecated/removed, these are no longer the "new consumer configs" but rather simply the "consumer configs"
|
|
|
|
|
|
| |
1. Remove unused variable: `partitions_for_topic`
2. No need to cast to list as `sorted()` already returns a list
3. Using `enumerate()` is cleaner than `range(len())` and handles assigning
`member`
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
|
| |
There was a very small possibility that between checking `self._can_send_request(node_id)` and grabbing the connection object via `self._conns[node_id]` that the connection could get closed / recycled / removed from _conns and cause a KeyError. This PR should prevent such a KeyError. In the case where the connection is disconnected by the time we call send(), we should expect conn.send() simply to fail the request.
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
|
|
|
|
|
| |
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution.
This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer.
Fix #1670
|
| |
|
| |
|
| |
|
|
|
| |
I noticed during local testing that version probing was happening twice when connecting to newer broker versions. This was because we call check_version() once explicitly, and then again implicitly within get_api_versions(). But once we have _api_versions data cached, we can just return it and avoid probing versions a second time.
|
|
|
|
|
|
|
| |
Underlying issue here is a race on consumer.close() between the client, the connections/sockets, and the heartbeat thread. Although the heartbeat thread is signaled to close, we do not block for it. So when we go on to close the client and its underlying connections, if the heartbeat is still doing work it can cause errors/crashes if it attempts to access the now closed objects (selectors and/or sockets, primarily).
So this commit adds a blocking thread join to the heartbeat close. This may cause some additional blocking time on consumer.close() while the heartbeat thread finishes. But it should be small in average case and in the worst case will be no longer than the heartbeat_timeout_ms (though if we timeout the join, race errors may still occur).
Fix #1666
|
| |
|
| |
|
| |
|
| |
|
|
|
|
|
|
| |
* Add BrokerConnection.send_pending_requests to support async network sends
* Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
* Dont acquire lock during KafkaClient.send if node is connected / ready
* Move all network connection IO into KafkaClient.poll()
|
|
|
| |
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
|
| |
|
| |
|
|
|
|
| |
stable group (#1695)
|
|
|
|
|
|
|
|
|
|
| |
This `skip_double_compressed_messages` flag was added in https://github.com/dpkp/kafka-python/pull/755 in
order to fix https://github.com/dpkp/kafka-python/issues/718.
However, grep'ing through the code, it looks like it this is no longer
used anywhere and doesn't do anything.
So removing it.
|
| |
|
|
|
|
|
|
|
| |
`getattr(object, 'x', object.y)` will evaluate the default argument
`object.y` regardless of whether `'x'` exists.
For details see: https://stackoverflow.com/q/31443989/770425
|
| |
|