diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-03-17 15:43:00 -0400 |
commit | 19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e (patch) | |
tree | 421d3e2d628e8b564eecde6a4efcd4edac31d1ff /kafka/conn.py | |
parent | 828133cff064f4f8fba753183ac21619355ac005 (diff) | |
parent | 32edabdaaff6746e4926cc897b4bba917a80cb54 (diff) | |
download | kafka-python-19646b1f1f9fae38c3075a9a56b8e7a9d395ff8e.tar.gz |
Merge branch 'master' into develop
Conflicts:
test/test_unit.py
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 2b8f1c2..7266ae8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -2,6 +2,7 @@ import copy import logging import socket import struct +from random import shuffle from threading import local from kafka.common import ConnectionError @@ -9,6 +10,31 @@ from kafka.common import ConnectionError log = logging.getLogger("kafka") DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 +DEFAULT_KAFKA_PORT = 9092 + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, str): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + + res = host_port.split(':') + host = res[0] + port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT + result.append((host.strip(), port)) + + if randomize: + shuffle(result) + + return result + class KafkaConnection(local): """ @@ -81,7 +107,7 @@ class KafkaConnection(local): sent = self._sock.sendall(payload) if sent is not None: self._raise_connection_error() - except socket.error, e: + except socket.error: log.exception('Unable to send payload to Kafka') self._raise_connection_error() |