diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-06-19 13:41:59 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-06-19 13:41:59 -0700 |
commit | 91f4642e92afc208531f66cea1ed7ef32bcfa4d1 (patch) | |
tree | 82e08036d43a297c865d4766b4198bac951ee9bb /kafka/cluster.py | |
parent | f126e5bfcc8f41ee5ea29b41ec6eabbc3f441647 (diff) | |
download | kafka-python-91f4642e92afc208531f66cea1ed7ef32bcfa4d1.tar.gz |
Use dedicated connection for group coordinator (#1822)
This changes the coordinator_id to be a unique string, e.g., `coordinator-1`, so that it will get a dedicated connection. This won't eliminate lock contention because the client lock applies to all connections, but it should improve in-flight-request contention.
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 36 |
1 files changed, 14 insertions, 22 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 4169549..19137de 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -65,6 +65,7 @@ class ClusterMetadata(object): self.config[key] = configs[key] self._bootstrap_brokers = self._generate_bootstrap_brokers() + self._coordinator_brokers = {} def _generate_bootstrap_brokers(self): # collect_hosts does not perform DNS, so we should be fine to re-use @@ -96,7 +97,11 @@ class ClusterMetadata(object): Returns: BrokerMetadata or None if not found """ - return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id) + return ( + self._brokers.get(broker_id) or + self._bootstrap_brokers.get(broker_id) or + self._coordinator_brokers.get(broker_id) + ) def partitions_for_topic(self, topic): """Return set of all partitions for topic (whether available or not) @@ -341,41 +346,28 @@ class ClusterMetadata(object): response (GroupCoordinatorResponse): broker response Returns: - bool: True if metadata is updated, False on error + string: coordinator node_id if metadata is updated, None on error """ log.debug("Updating coordinator for %s: %s", group, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: log.error("GroupCoordinatorResponse error: %s", error_type) self._groups[group] = -1 - return False + return - node_id = response.coordinator_id + # Use a coordinator-specific node id so that group requests + # get a dedicated connection + node_id = 'coordinator-{}'.format(response.coordinator_id) coordinator = BrokerMetadata( - response.coordinator_id, + node_id, response.host, response.port, None) - # Assume that group coordinators are just brokers - # (this is true now, but could diverge in future) - if node_id not in self._brokers: - self._brokers[node_id] = coordinator - - # If this happens, either brokers have moved without - # changing IDs, or our assumption above is wrong - else: - node = self._brokers[node_id] - if coordinator.host != node.host or coordinator.port != node.port: - log.error("GroupCoordinator metadata conflicts with existing" - " broker metadata. Coordinator: %s, Broker: %s", - coordinator, node) - self._groups[group] = node_id - return False - log.info("Group coordinator for %s is %s", group, coordinator) + self._coordinator_brokers[node_id] = coordinator self._groups[group] = node_id - return True + return node_id def with_partitions(self, partitions_to_add): """Returns a copy of cluster metadata with partitions added""" |