diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 36 |
1 files changed, 36 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 9514e48..0602d70 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -8,6 +8,8 @@ from threading import local import six from kafka.common import ConnectionError +from kafka.protocol.api import RequestHeader +from kafka.protocol.types import Int32 log = logging.getLogger(__name__) @@ -16,6 +18,40 @@ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +class BrokerConnection(local): + def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): + super(BrokerConnection, self).__init__() + self.host = host + self.port = port + self.timeout = timeout + self._sock = socket.create_connection((host, port), timeout) + self.fd = self._sock.makefile(mode='+') + self.correlation_id = 0 + + def close(self): + self.fd.close() + self._sock.close() + + def send(self, request): + self.correlation_id += 1 + header = RequestHeader(request, correlation_id=self.correlation_id) + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + self.fd.write(size) + self.fd.write(message) + self.fd.flush() + + size = Int32.decode(self.fd) + correlation_id = Int32.decode(self.fd) + return request.RESPONSE_TYPE.decode(self.fd) + + def __getnewargs__(self): + return (self.host, self.port, self.timeout) + + def __repr__(self): + return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) + + def collect_hosts(hosts, randomize=True): """ Collects a comma-separated set of hosts (host:port) and optionally |