summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py21
1 files changed, 21 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 2b8f1c2..de2d385 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import ConnectionError
@@ -10,6 +11,26 @@ log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ result = []
+ for host_port in hosts.split(","):
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else 9092
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
+
class KafkaConnection(local):
"""
A socket connection to a single Kafka broker