| Commit message (Collapse) | Author | Age | Files | Lines |
| | |
|
| | |
|
| |
|
| |
There was a very small possibility that between checking `self._can_send_request(node_id)` and grabbing the connection object via `self._conns[node_id]` that the connection could get closed / recycled / removed from _conns and cause a KeyError. This PR should prevent such a KeyError. In the case where the connection is disconnected by the time we call send(), we should expect conn.send() simply to fail the request.
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
|
|
|
|
|
| |
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution.
This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer.
Fix #1670
|
| | |
|
| | |
|
| | |
|
| |
|
| |
I noticed during local testing that version probing was happening twice when connecting to newer broker versions. This was because we call check_version() once explicitly, and then again implicitly within get_api_versions(). But once we have _api_versions data cached, we can just return it and avoid probing versions a second time.
|
| |
|
|
|
|
|
| |
Underlying issue here is a race on consumer.close() between the client, the connections/sockets, and the heartbeat thread. Although the heartbeat thread is signaled to close, we do not block for it. So when we go on to close the client and its underlying connections, if the heartbeat is still doing work it can cause errors/crashes if it attempts to access the now closed objects (selectors and/or sockets, primarily).
So this commit adds a blocking thread join to the heartbeat close. This may cause some additional blocking time on consumer.close() while the heartbeat thread finishes. But it should be small in average case and in the worst case will be no longer than the heartbeat_timeout_ms (though if we timeout the join, race errors may still occur).
Fix #1666
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
|
|
|
|
| |
* Add BrokerConnection.send_pending_requests to support async network sends
* Send network requests during KafkaClient.poll() rather than in KafkaClient.send()
* Dont acquire lock during KafkaClient.send if node is connected / ready
* Move all network connection IO into KafkaClient.poll()
|
| |
|
| |
In an effort to reduce the surface area of lock coordination, and thereby hopefully reduce lock contention, I think we can remove locking from the read-only KafkaClient methods: connected, is_disconnected, in_flight_request_count, and least_loaded_node . Given that the read data could change after the lock is released but before the caller uses it, the value of acquiring a lock here does not seem high to me.
|
| | |
|
| | |
|
| |
|
|
| |
stable group (#1695)
|
| |
|
|
|
|
|
|
|
|
| |
This `skip_double_compressed_messages` flag was added in https://github.com/dpkp/kafka-python/pull/755 in
order to fix https://github.com/dpkp/kafka-python/issues/718.
However, grep'ing through the code, it looks like it this is no longer
used anywhere and doesn't do anything.
So removing it.
|
| | |
|
| |
|
|
|
|
|
| |
`getattr(object, 'x', object.y)` will evaluate the default argument
`object.y` regardless of whether `'x'` exists.
For details see: https://stackoverflow.com/q/31443989/770425
|
| | |
|
| |
|
|
|
|
|
|
|
|
|
|
| |
Previously we weren't accounting for when the response tuple also has a
`error_message` value.
Note that in Java, the error fieldname is inconsistent:
- `CreateTopicsResponse` / `CreatePartitionsResponse` uses `topic_errors`
- `DeleteTopicsResponse` uses `topic_error_codes`
So this updates the `CreateTopicsResponse` classes to match.
The fix is a little brittle, but should suffice for now.
|
| | |
|
| |
|
| |
`isinstance()` won't work here, as the types require identity comparison.
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| | |
|
| |
|
|
|
|
|
|
|
|
|
|
|
| |
This was completely broken previously because it didn't lookup the group
coordinator of the consumer group. Also added basic error
handling/raising.
Note:
I added the `group_coordinator_id` as an optional kwarg. As best I
can tell, the Java client doesn't include this and instead looks it up
every time. However, if we add this, it allows the caller the
flexibility to bypass the network round trip of the lookup if for some
reason they already know the `group_coordinator_id`.
|
| |
|
|
|
|
|
| |
Set a clear default value for `validate_only` / `include_synonyms`
Previously the kwarg defaulted to `None`, but then sent a `False` so this
makes it more explicit and reduces ambiguity.
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Previously, this only queried the controller. In actuality, the Kafka
protocol requires that the client query all brokers in order to get the
full list of consumer groups.
Note: The Java code (as best I can tell) doesn't allow limiting this to
specific brokers. And on the surface, this makes sense... you typically
don't care about specific brokers.
However, the inverse is true... consumer groups care about knowing their
group coordinator so they don't have to repeatedly query to find it.
In fact, a Kafka broker will only return the groups that it's a
coordinator for. While this is an implementation detail that is not
guaranteed by the upstream broker code, and technically should not be
relied upon, I think it very unlikely to change.
So monitoring scripts that fetch the offsets or describe the consumers
groups of all groups in the cluster can simply issue one call per broker
to identify all the coordinators, rather than having to issue one call
per consumer group. For an ad-hoc script this doesn't matter, but for
a monitoring script that runs every couple of minutes, this can be a big
deal. I know in the situations where I will use this, this matters more
to me than the risk of the interface unexpectedly breaking.
|
| |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| |
Support fetching the offsets of a consumer group.
Note: As far as I can tell (the Java code is a little inscrutable), the
Java AdminClient doesn't allow specifying the `coordinator_id` or the
`partitions`.
But I decided to include them because they provide a lot of additional
flexibility:
1. allowing users to specify the partitions allows this method to be used even for
older brokers that don't support the OffsetFetchRequest_v2
2. allowing users to specify the coordinator ID gives them a way to
bypass a network round trip. This method will frequently be used for
monitoring, and if you've got 1,000 consumer groups that are being
monitored once a minute, that's ~1.5M requests a day that are
unnecessarily duplicated as the coordinator doesn't change unless
there's an error.
|