summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py28
1 files changed, 27 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index cc946fc..7dcd726 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import ConnectionError
@@ -9,6 +10,31 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
+DEFAULT_KAFKA_PORT = 9092
+
+
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ if isinstance(hosts, str):
+ hosts = hosts.strip().split(',')
+
+ result = []
+ for host_port in hosts:
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
class KafkaConnection(local):
"""
@@ -84,7 +110,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error, e:
+ except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()