summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py33
1 files changed, 26 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 14aebc6..614b1bb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import BufferUnderflowError
@@ -10,6 +11,26 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ result = []
+ for host_port in hosts.split(","):
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else 9092
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
+
class KafkaConnection(local):
"""
A socket connection to a single Kafka broker
@@ -19,14 +40,14 @@ class KafkaConnection(local):
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
- def __init__(self, host, port, bufsize=4096):
+ def __init__(self, host, port, bufsize=4096, timeout=10):
super(KafkaConnection, self).__init__()
self.host = host
self.port = port
self.bufsize = bufsize
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((host, port))
- self._sock.settimeout(10)
+ self.timeout = timeout
+
+ self._sock = socket.create_connection((host, port), timeout=timeout)
self._dirty = False
def __str__(self):
@@ -125,7 +146,5 @@ class KafkaConnection(local):
Re-initialize the socket connection
"""
self.close()
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.connect((self.host, self.port))
- self._sock.settimeout(10)
+ self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
self._dirty = False