summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorMark Roberts <markroberts@kixeye.com>2014-02-26 21:36:22 -0800
committerMark Roberts <markroberts@kixeye.com>2014-02-26 21:36:22 -0800
commit112158fa75879e2343c361c00a8aecb176dbb767 (patch)
treeb09b03781394c553ad902ca4a941865b74a76125 /kafka/conn.py
parent3b39d9d6589ee46ae4d31fb078f44019a2810983 (diff)
parentab89a44ecf9c93b116fcc8516cfc21749df74507 (diff)
downloadkafka-python-112158fa75879e2343c361c00a8aecb176dbb767.tar.gz
Merge branch 'master' into conn_refactor
Conflicts: example.py
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py28
1 files changed, 27 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index cc946fc..7dcd726 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -2,6 +2,7 @@ import copy
import logging
import socket
import struct
+from random import shuffle
from threading import local
from kafka.common import ConnectionError
@@ -9,6 +10,31 @@ from kafka.common import ConnectionError
log = logging.getLogger("kafka")
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
+DEFAULT_KAFKA_PORT = 9092
+
+
+def collect_hosts(hosts, randomize=True):
+ """
+ Collects a comma-separated set of hosts (host:port) and optionnaly
+ randomize the returned list.
+ """
+
+ if isinstance(hosts, str):
+ hosts = hosts.strip().split(',')
+
+ result = []
+ for host_port in hosts:
+
+ res = host_port.split(':')
+ host = res[0]
+ port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
+ result.append((host.strip(), port))
+
+ if randomize:
+ shuffle(result)
+
+ return result
+
class KafkaConnection(local):
"""
@@ -84,7 +110,7 @@ class KafkaConnection(local):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
- except socket.error, e:
+ except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()