summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py36
1 files changed, 36 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 9514e48..0602d70 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -8,6 +8,8 @@ from threading import local
import six
from kafka.common import ConnectionError
+from kafka.protocol.api import RequestHeader
+from kafka.protocol.types import Int32
log = logging.getLogger(__name__)
@@ -16,6 +18,40 @@ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
+class BrokerConnection(local):
+ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
+ super(BrokerConnection, self).__init__()
+ self.host = host
+ self.port = port
+ self.timeout = timeout
+ self._sock = socket.create_connection((host, port), timeout)
+ self.fd = self._sock.makefile(mode='+')
+ self.correlation_id = 0
+
+ def close(self):
+ self.fd.close()
+ self._sock.close()
+
+ def send(self, request):
+ self.correlation_id += 1
+ header = RequestHeader(request, correlation_id=self.correlation_id)
+ message = b''.join([header.encode(), request.encode()])
+ size = Int32.encode(len(message))
+ self.fd.write(size)
+ self.fd.write(message)
+ self.fd.flush()
+
+ size = Int32.decode(self.fd)
+ correlation_id = Int32.decode(self.fd)
+ return request.RESPONSE_TYPE.decode(self.fd)
+
+ def __getnewargs__(self):
+ return (self.host, self.port, self.timeout)
+
+ def __repr__(self):
+ return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)
+
+
def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally