summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-09 11:19:32 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-09 11:19:32 -0700
commit385f2d80f945dac074f3998e3acc34531b13947a (patch)
treed8f0fb35d318cc1c0206613f295df8894af1a702 /kafka
parent12fae12ef2591b6129ed10431e6f4925682f7b1c (diff)
downloadkafka-python-385f2d80f945dac074f3998e3acc34531b13947a.tar.gz
Refactor away _get_conn_for_broker. Fix bug in _get_conn
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py19
1 files changed, 6 insertions, 13 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 39c89ba..65914a4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -45,23 +45,16 @@ class KafkaClient(object):
def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"
-
host_key = (host, port)
if host_key not in self.conns:
- self.conns[host_key] = KafkaConnection(host, port)
+ self.conns[host_key] = KafkaConnection(
+ host,
+ port,
+ timeout=self.timeout
+ )
return self.conns[host_key]
- def _get_conn_for_broker(self, broker):
- """
- Get or create a connection to a broker
- """
- if (broker.host, broker.port) not in self.conns:
- self.conns[(broker.host, broker.port)] = \
- KafkaConnection(broker.host, broker.port, timeout=self.timeout)
-
- return self._get_conn(broker.host, broker.port)
-
def _get_leader_for_partition(self, topic, partition):
"""
Returns the leader for a partition or None if the partition exists
@@ -151,7 +144,7 @@ class KafkaClient(object):
# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
- conn = self._get_conn_for_broker(broker)
+ conn = self._get_conn(broker.host, broker.port)
requestId = self._next_id()
request = encoder_fn(client_id=self.client_id,
correlation_id=requestId, payloads=payloads)